The back-end of FlinkRunner in Apache Beam

Table of Content

1. Beam SDK-Runner model

The Beam SDKs provide a unified programming model that can define your data processing pipeline, regardless how it’s actually processed. The Beam Runners translate the data processing pipeline into the API compatible with the distributed processing back-end of your choice.

To make is simple, Beam SDK defines a context-free data processing model. When it goes to an actual execution engine, like Flink, Spark, Storm, event MapReduce, the Runner is responsible to do the translate work, to re-express the data pipeline with its own API.

2. PipelineRunner interface

PipelineRunner is the parent class for any runner. It contains a core function run as below, which is responsible to translate a Pipeline in Beam SDK to runner API.

  /**
   * Processes the given Pipeline, returning the results.
   */
  public abstract ResultT run(Pipeline pipeline);

3. FlinkRunner

Apache Flink is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.

FlinkRunner is the one to run a Beam Pipeline on a Flink cluster.

3.1. FlinkPipelineExecutionEnvironment

In Beam SDK, there’s no difference for both bounded and unbounded data API, which is not the same in Flink. FlinkPipelineExecutionEnvironment hides the difference, inside of it, there’s org.apache.flink.api.java.ExecutionEnvironment for bounded data, aka batch process; and org.apache.flink.streaming.api.environment.StreamExecutionEnvironment for unbounded data, aka streaming process.

3.2. Translate Pipeline to Flink DataStream

Now let’s go to the core function public PipelineResult run(Pipeline pipeline) {. The main logic is expressed with below lines:

FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

env.translate(this, pipeline);

JobExecutionResult result = env.executePipeline();

...

public void translate(FlinkRunner flinkRunner, Pipeline pipeline)

The first step is to detect the mode of pipeline, whether it’s STREAMING or BATCH. It’s quite straight-forward with below lines

TranslationMode translationMode = optimizer.getTranslationMode();
...
if (options.isStreaming()) {
  return TranslationMode.STREAMING;
}

If it’s TranslationMode.STREAMING, a FlinkStreamingPipelineTranslator is created, otherwise FlinkBatchPipelineTranslator for TranslationMode.BATCH mode.

    if (translationMode == TranslationMode.STREAMING) {
      this.flinkStreamEnv = createStreamExecutionEnvironment();
      translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
    } else {
      this.flinkBatchEnv = createBatchExecutionEnvironment();
      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
    }

The details to setup environment from FlinkPipelineOptions will be described later, let’s focus on the pipeline translator first.

Take TranslationMode.STREAMING mode for example. FlinkStreamingPipelineTranslator extends Pipeline.PipelineVisitor.Defaults, and leverage the traverseTopologically to perform the detailed transform work. The PipelineVisitor is very important to understand the implementation of Runner, will update another post to talk it later soon.

submit to run

Now a pipeline job expressed with Flink DataStream is ready, it can be submitted to run with method flinkEnv.execute(jobName);

3.3. usage of FlinkPipelineOptions

All pipeline options can be found in JDK API FlinkPipelineOptions. I’ll highlight several ones that are related with the implementation of runner itself.

–streaming

As mentioned above, FlinkRunner has separated code for batch and streaming, so make sure you use the right one.

-d in bin/flink run

Mostly I don’t submit a Flink job as standard Java application, instead bin/flink run is used. By default it waits until job is finished. For batch, it’s OK, while in streaming mode, it’s highly suggested to add a -d option in the CLI command. With it, it’s running in Detached mode, and return immediately.

Run Flink Jobs on YARN

Followed by the discussion of Job Isolation on Flink and Standalone vs YARN cluster for Flink. Now I’ll show the steps to setup the environment, and run Flink jobs on YARN.

YARN installation

Install Cloudera Manager

First of all, a YARN cluster is required as the resource container.

I use Cloudera Manager to facilitate the manual work. You can refer to the documents here for more details.

# backup the core commands here to install Cloudera Manager
$ wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin

$ chmod u+x cloudera-manager-installer.bin

$ sudo ./cloudera-manager-installer.bin

Now you can open the portal in your browser to http://hostname:7180/, with username/password as admin/admin.

Setup YARN

Refer to Cloudera Manager on how to install YARN cluster.

Please note that:

  • You can mix hosts with different size in the cluster;
  • The minimal components only include HDFS and YARN;

Submit a Flink job

1. Flink cluster on YARN

In this mode, a virtual Flink cluster is created and maintained by YARN. Then you can submit jobs as a standalone one. Be aware that, jobs running in this virtual cluster are not isolated, which is natural according to Flink concepts.

1.1. create Flink Cluster on YARN

In Flink, a tool yarn-session.sh is provided to manage Flink clusters on YARN. You can run below command to create a cluster named ‘flink_yarn‘, with 5 TaskManager, each has 2048M memory and 4 slots, –one JobManager is added by default.

./flink-1.1.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn

Go to ResourceManager Web UI, you can find one application named ‘flink_yarn‘ running there.

Note: as Flink document noticed,

the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR environment variable to be set to read the YARN and HDFS configuration.

You can download a zip file contained all configuration files in Cloudera Manager UI, by following Hadoop Cluster Name -> Actions -> View Client Configuration URLs -> YARN (MR2 Included). Unzip and place in Flink client host, and export it as YARN_CONF_DIR or HADOOP_CONF_DIR, either in /etc/profile or ./flink-1.1.4/bin/config.sh.

1.2. Submit Flink job

Now you have a Flink cluster running on YARN, you can easily submit a Flink job as normal. The most important difference is, specify the JobManager by parameter -m.

To find the URL for JobManager, follow the steps as below:
1. open the link ApplicationMaster in YARN ResourceManager Web UI, it redirects to a Flink UI;
2. choose Job Manager on the left panel;
3. In the Configuration tab, the value for parameter -m is {jobmanager.rpc.address}:{jobmanager.rpc.port}

Here’s one example:

./flink-1.1.4/bin/flink run -m host:port -c main.class ./flink_job.jar [parameters]

Now the job is running.

2. Run Flink job Oven YARN directly.

Here’s one example to submit a Flink job directly on YARN:

./flink-1.2.0/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8086 -c beam.count.OpsCount -yqu data-default \
-yD taskmanager.heap.mb=4096 -yD yarn.heap-cutoff-ratio=0.6 -yD taskmanager.debug.memory.startLogThread=true -yD taskmanager.debug.memory.logIntervalMs=600000 \
-yz toratest -yst -yd ./beampoc-bundled-0.0.1-SNAPSHOT.jar --parallelism=4

Known issues

With default parameters, you may see the job fails quickly, and find similar exceptions:

Caused by: java.lang.Exception: TaskManager was lost/killed: \
ResourceID{resourceId='container_e162_1488664681401_10510_01_000003'} \
@ hdc9-phx04-0180-0102-030.*.com (dataPort=33590)

And more from JobManager/TaskManager logs:

exitStatus=Pmem limit exceeded
is running beyond physical memory limits
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

This is caused due to JVM settings in YARN container. You can refer to maillist thread http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Container-running-beyond-physical-memory-limits-when-processing-DataStream-td8188.html to understand more details. My settings are presented as above.