Introduction to Apache Beam with Java

Key points

  • Apache Beam is an open source batch and streaming project
  • Its portability allows you to run pipelines in different backends from Apache Spark to Google Cloud Dataflow
  • Beam is extensible, which means you can write and share new SDKs, IO connectors, and transformers
  • Beam currently supports Python, Java and Go
  • By using your Java SDK, you can take full advantage of the JVM

In this article, we present Apache Beam, a powerful open source batch and streaming project used by large companies such as eBay to integrate their streaming pipelines and by Mozilla to move data securely between their systems.

Overview

Apache Beam is a batch and streaming data processing programming model.

Using the SDKs provided for Java, Python, and Go, you can develop pipelines and then choose a backend to run the pipeline.

Advantages of Apache Beam

Beam Model (Frances Perry and Tyler Akidau)

  • Integrated I / O connectors

    • Apache Beam connectors make it easy to extract and upload data from a variety of storage types
    • The main types of connectors are:

      • File-based (eg Apache Parquet, Apache Thrift)
      • File system (eg Hadoop, Google Cloud Storage, Amazon S3)
      • Messaging (e.g. Apache Kafka, Google Pub / Sub, Amazon SQS)
      • Database (eg Apache Cassandra, Elastic Search, MongoDb)

    • As an OSS project, support for new plugins is growing (eg InfluxDB, Neo4J)

  • Portability:

    • Beam provides several runners to run the pipes, allowing you to choose the best one for each use case and avoid vendor blocking.
    • Distributed processing backends such as Apache Flink, Apache Spark, or Google Cloud Dataflow can be used as brokers.

  • Distributed parallel processing:

    • Each element of the data set is managed independently by default, so that its processing can be optimized by running in parallel.
    • Developers do not need to manually distribute the load among workers, as Beam provides an abstraction.

The beam model

The key concepts of the Beam programming model are:

  • PCcollection: represents a collection of data, ie an array of numbers or words extracted from a text.
  • PTransform: A transform function that receives and returns a PCcollection, that is, it adds up all the numbers.
  • Pipeline: Manages the interactions between PTransforms and PCollections.
  • PipelineRunner: Specifies where and how the pipeline should run.

Quick start

A basic pipeline operation consists of 3 steps: reading, processing, and writing the result of the transformation. Each of these steps is defined by a program using one of the Apache Beam SDKs.

In this section, we will create pipelines using the Java SDK. You can choose to create a local app (using Gradle or Maven) or use the online Playground. The examples will use the local corridor, as it will be easier to verify the result using JUnit Assertions.

Local Java dependencies

  • beam-sdks-java-core: contains all Beam Model classes.
  • beam-runners-direct-java: By default, the Apache Beam SDK will use the direct runner, which means the pipeline will run on your local machine.

Multiply by 2

In this first example, the pipeline will receive an array of numbers and map each element multiplied by 2.

The first step is to create the pipe instance that will receive the input array and execute the transformation function. Because we use JUnit to run Apache Beam, we can easily create a TestPipeline as a test class attribute. If you prefer to run in your main application, you will need to configure the pipeline configuration options,

@Temporary final public run TestPipeline pipeline = TestPipeline.create ();

We can now create the PCcollection that will be used as a pipeline entry. It will be an array created directly from memory, but can be read from anywhere that supports Apache Beam:

PCcollection names = pipeline.apply (Create.of (1, 2, 3, 4, 5));

Next, we apply our transformation function, which will double each element in the dataset:

PCollection output = numbers.apply (MapElements.into (TypeDescriptors.integers ()) .via ((integer) -> number * 2));

To verify the results we can write a statement:

PAssert.that (output) .containsInAnyOrder (2, 4, 6, 8, 10);

Note that the results should not be sorted as input, because Apache Beam processes each item independently and in parallel.

The test at this point has been done and we run the pipeline by calling:

pipeline.run ();

Reduce operation

The reduction operation is the combination of multiple input items that results in a smaller collection, which typically contains a single item.

MapReduce (Frances Perry and Tyler Akidau)

We now extend the example above to add all the elements multiplied by two, resulting in a MapReduce transformation.

Each transformation of PCollection results in a new instance of PCollection, which means that we can chain transformations using the apply method. In this case, the Sum operation will be used after multiplying each entry by 2:

PCcollection names = pipeline.apply (Create.of (1, 2, 3, 4, 5)); PCollection output = names .apply (MapElements.into (TypeDescriptors.integers ()) .via ((integer) -> number * 2)) .apply (Sum.integersGlobally ()); PAssert.that (output) .containsInAnyOrder (30); pipeline.run ();

FlatMap operation

FlatMap is an operation that first applies a map to each input item that normally returns a new collection, resulting in a collection of collections. A flat operation is then applied to combine all the nested collections, resulting in a single one.

The next example is to transform arrays of strings into a single array containing each word.

First, we declare our list of words that will be used as channel input:

final string[] WORDS_ARRAY = new string[] {“hello bob”, “hello alĂ­cia”, “hola sue”}; Final list WORDS = Arrays.asList (WORDS_ARRAY);

Then we create the PCollection entry using the list above:

PCollection input = pipeline.apply (Create.of (WORDS));

We now apply the flat map transformation, which will split the words in each nested array and merge the results into a single list:

PCollection output = input.apply (FlatMapElements.into (TypeDescriptors.strings ()) .via ((String line) -> Arrays.asList (line.split (“”)))); PAssert.that (exit) .containsInAnyOrder (“hello”, “bob”, “hello”, “alicia”, “hello”, “sue”); pipeline.run ();

Group operation

A common job in data processing is to add or count using a specific key. We will prove this by counting the number of occurrences of each word in the example above.

After having the flat string array, we can chain another PTransform:

PCollection> output = input .apply (FlatMapElements.into (TypeDescriptors.strings ()) .via ((String line) -> Arrays.asList (line.split (“”)))) .apply (Count.perElement ());

Resulting in:

PAssert.that (output) .containsInAnyOrder (KV.of (“hello”, 2L), KV.of (“hello”, 1L), KV.of (“alicia”, 1L), KV.of (“sue”, 1L), KV.of (“bob”, 1L));

Reading a file

One of the principles of Apache Beam is to read data from anywhere, so we see in practice how to use a text file as a data source.

The following example will read the contents of a “words.txt” with the contents “An advanced unified programming model”. The transformation function will then return a PCcollection containing each word in the text.

PCollection input = pipeline.apply (TextIO.read (). from (“./src/main/resources/words/txt”)); PCollection output = input.apply (FlatMapElements.into (TypeDescriptors.strings ()) .via ((String line) -> Arrays.asList (line.split (“”)))); PAssert.that (output) .containsInAnyOrder (“An”, “Advanced”, “Unified”, “Programming”, “Model”); pipeline.run ();

Writing the output to a file

As seen in the example above for input, Apache Beam has several built-in output connectors. In the following example, we will count the number of each word present in the text file “words.txt” which contains only one sentence (“An advanced unified programming model”) and the output will be kept in a text file format.

PCollection input = pipeline.apply (TextIO.read (). from (“./src/main/resources/words/txt”)); PCollection> output = input .apply (FlatMapElements.into (TypeDescriptors.strings ()) .via ((String line) -> Arrays.asList (line.split (“”)))) .apply (Count.perElement ()) ;; Affirm.que (output) .containsInAnyOrder (KV.of (“An”, 1L), KV.of (“Advanced”, 1L), KV.of (“Unified”, 1L), KV.of (“Programming”), 1L), KV.of (“model”, 1L)); output .apply (MapElements.into (TypeDescriptors.strings ()) .via (KV kv) -> kv.getKey () + “” + kv.getValue ())) .apply (TextIO.write () .to (“./ src / main / resources / wordscount”)); pipeline.run ();

Even file writing is optimized for parallelism by default, which means that Beam will determine the best number of snippets (files) to maintain the result. The files will be in the src / main / resources folder and will have the prefix “wordcount”, the number of snippets and the total number of snippets as defined in the last output transformation.

When you run it on my laptop, …

Leave a Comment

Your email address will not be published. Required fields are marked *