The back-end of FlinkRunner in Apache Beam

Table of Content

1. Beam SDK-Runner model

The Beam SDKs provide a unified programming model that can define your data processing pipeline, regardless how it’s actually processed. The Beam Runners translate the data processing pipeline into the API compatible with the distributed processing back-end of your choice.

To make is simple, Beam SDK defines a context-free data processing model. When it goes to an actual execution engine, like Flink, Spark, Storm, event MapReduce, the Runner is responsible to do the translate work, to re-express the data pipeline with its own API.

2. PipelineRunner interface

PipelineRunner is the parent class for any runner. It contains a core function run as below, which is responsible to translate a Pipeline in Beam SDK to runner API.

  /**
   * Processes the given Pipeline, returning the results.
   */
  public abstract ResultT run(Pipeline pipeline);

3. FlinkRunner

Apache Flink is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.

FlinkRunner is the one to run a Beam Pipeline on a Flink cluster.

3.1. FlinkPipelineExecutionEnvironment

In Beam SDK, there’s no difference for both bounded and unbounded data API, which is not the same in Flink. FlinkPipelineExecutionEnvironment hides the difference, inside of it, there’s org.apache.flink.api.java.ExecutionEnvironment for bounded data, aka batch process; and org.apache.flink.streaming.api.environment.StreamExecutionEnvironment for unbounded data, aka streaming process.

3.2. Translate Pipeline to Flink DataStream

Now let’s go to the core function public PipelineResult run(Pipeline pipeline) {. The main logic is expressed with below lines:

FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

env.translate(this, pipeline);

JobExecutionResult result = env.executePipeline();

...

public void translate(FlinkRunner flinkRunner, Pipeline pipeline)

The first step is to detect the mode of pipeline, whether it’s STREAMING or BATCH. It’s quite straight-forward with below lines

TranslationMode translationMode = optimizer.getTranslationMode();
...
if (options.isStreaming()) {
  return TranslationMode.STREAMING;
}

If it’s TranslationMode.STREAMING, a FlinkStreamingPipelineTranslator is created, otherwise FlinkBatchPipelineTranslator for TranslationMode.BATCH mode.

    if (translationMode == TranslationMode.STREAMING) {
      this.flinkStreamEnv = createStreamExecutionEnvironment();
      translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
    } else {
      this.flinkBatchEnv = createBatchExecutionEnvironment();
      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
    }

The details to setup environment from FlinkPipelineOptions will be described later, let’s focus on the pipeline translator first.

Take TranslationMode.STREAMING mode for example. FlinkStreamingPipelineTranslator extends Pipeline.PipelineVisitor.Defaults, and leverage the traverseTopologically to perform the detailed transform work. The PipelineVisitor is very important to understand the implementation of Runner, will update another post to talk it later soon.

submit to run

Now a pipeline job expressed with Flink DataStream is ready, it can be submitted to run with method flinkEnv.execute(jobName);

3.3. usage of FlinkPipelineOptions

All pipeline options can be found in JDK API FlinkPipelineOptions. I’ll highlight several ones that are related with the implementation of runner itself.

–streaming

As mentioned above, FlinkRunner has separated code for batch and streaming, so make sure you use the right one.

-d in bin/flink run

Mostly I don’t submit a Flink job as standard Java application, instead bin/flink run is used. By default it waits until job is finished. For batch, it’s OK, while in streaming mode, it’s highly suggested to add a -d option in the CLI command. With it, it’s running in Detached mode, and return immediately.

Description of Interfaces in DoFn

DoFn is the core API in Apache Beam. The function to use to process each element is specified by a DoFn, primarily via its ProcessElement method.

According to JavaDoc:

Conceptually, when a ParDo transform is executed, the elements of the input PCollection are first divided up into some number of “bundles”. These are farmed off to distributed worker machines (or run locally, if using the DirectRunner). For each bundle of input elements processing proceeds as follows:

  1. If required, a fresh instance of the argument DoFn is created on a worker, and the DoFn.Setup method is called on this instance. This may be through deserialization or other means. A PipelineRunner may reuse DoFn instances for multiple bundles. A DoFn that has terminated abnormally (by throwing an Exception) will never be reused.
  2. The DoFn’s DoFn.StartBundle method, if provided, is called to initialize it.
  3. The DoFn’s DoFn.ProcessElement method is called on each of the input elements in the bundle.
  4. The DoFn’s DoFn.FinishBundle method, if provided, is called to complete its work. After DoFn.FinishBundle is called, the framework will not again invoke DoFn.ProcessElement or DoFn.FinishBundle until a new call to DoFn.StartBundle has occurred.
  5. If any of DoFn.Setup, DoFn.StartBundle, DoFn.ProcessElement or DoFn.FinishBundle methods throw an exception, the DoFn.Teardown method, if provided, will be called on the DoFn instance.
  6. If a runner will no longer use a DoFn, the DoFn.Teardown method, if provided, will be called on the discarded instance.

Each of the calls to any of the DoFn’s processing methods can produce zero or more output elements. All of the of output elements from all of the DoFn instances are included in the output PCollection.

Here’s one example implementation:

public class MapBuyerSellerLkpFn extends DoFn<MapCheckout, MapCheckout> {

  /**
   * 
   */
  private static final long serialVersionUID = -451161312788562675L;  

  @Setup
  public void setup(){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "setup");
  }
  @Teardown
  public void close(){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "close");
  }

  @StartBundle
  public void startBundle(Context c){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "startBundle");
  }
  @FinishBundle
  public void finishBundle(Context c){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "finishBundle");
  }  

  @Override
  public void prepareForProcessing() {
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "prepareForProcessing");    
  }

  @ProcessElement
    public void processElement(ProcessContext c) {
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "processElement");
  }
}

I add a tracker BeamComponentTracker to count how many times each function is invoked, which is shown as below to verify the back-end execution chain.

MapBuyerSellerLkpFn":{"prepareForProcessing":10,"setup":1,"startBundle":10,"finishBundle":10,"processElement":10}
MapBuyerSellerLkpFn":{"prepareForProcessing":15,"setup":1,"startBundle":15,"finishBundle":15,"processElement":15}
MapBuyerSellerLkpFn":{"prepareForProcessing":20,"setup":1,"startBundle":20,"finishBundle":20,"processElement":20}
MapBuyerSellerLkpFn":{"prepareForProcessing":27,"setup":1,"startBundle":27,"finishBundle":27,"processElement":27}
MapBuyerSellerLkpFn":{"prepareForProcessing":35,"setup":1,"startBundle":35,"finishBundle":35,"processElement":35}
MapBuyerSellerLkpFn":{"prepareForProcessing":46,"setup":1,"startBundle":46,"finishBundle":46,"processElement":46}