Table of Content
1. Beam SDK-Runner model
The Beam SDKs provide a unified programming model that can define your data processing pipeline, regardless how it’s actually processed. The Beam Runners translate the data processing pipeline into the API compatible with the distributed processing back-end of your choice.
To make is simple, Beam SDK defines a context-free data processing model. When it goes to an actual execution engine, like Flink, Spark, Storm, event MapReduce, the Runner
is responsible to do the translate work, to re-express the data pipeline with its own API.
2. PipelineRunner interface
PipelineRunner
is the parent class for any runner. It contains a core function run
as below, which is responsible to translate a Pipeline
in Beam SDK to runner API.
/** * Processes the given Pipeline, returning the results. */ public abstract ResultT run(Pipeline pipeline);
3. FlinkRunner
Apache Flink is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.
FlinkRunner
is the one to run a Beam Pipeline on a Flink cluster.
3.1. FlinkPipelineExecutionEnvironment
In Beam SDK, there’s no difference for both bounded and unbounded data API, which is not the same in Flink. FlinkPipelineExecutionEnvironment
hides the difference, inside of it, there’s org.apache.flink.api.java.ExecutionEnvironment
for bounded data, aka batch process; and org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
for unbounded data, aka streaming process.
3.2. Translate Pipeline to Flink DataStream
Now let’s go to the core function public PipelineResult run(Pipeline pipeline) {
. The main logic is expressed with below lines:
FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); env.translate(this, pipeline); JobExecutionResult result = env.executePipeline(); ...
public void translate(FlinkRunner flinkRunner, Pipeline pipeline)
The first step is to detect the mode of pipeline, whether it’s STREAMING or BATCH. It’s quite straight-forward with below lines
TranslationMode translationMode = optimizer.getTranslationMode(); ... if (options.isStreaming()) { return TranslationMode.STREAMING; }
If it’s TranslationMode.STREAMING
, a FlinkStreamingPipelineTranslator
is created, otherwise FlinkBatchPipelineTranslator
for TranslationMode.BATCH
mode.
if (translationMode == TranslationMode.STREAMING) { this.flinkStreamEnv = createStreamExecutionEnvironment(); translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options); } else { this.flinkBatchEnv = createBatchExecutionEnvironment(); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); }
The details to setup environment from FlinkPipelineOptions
will be described later, let’s focus on the pipeline translator first.
Take TranslationMode.STREAMING
mode for example. FlinkStreamingPipelineTranslator
extends Pipeline.PipelineVisitor.Defaults
, and leverage the traverseTopologically
to perform the detailed transform work. The PipelineVisitor
is very important to understand the implementation of Runner, will update another post to talk it later soon.
submit to run
Now a pipeline job expressed with Flink DataStream
is ready, it can be submitted to run with method flinkEnv.execute(jobName);
3.3. usage of FlinkPipelineOptions
All pipeline options can be found in JDK API FlinkPipelineOptions. I’ll highlight several ones that are related with the implementation of runner itself.
–streaming
As mentioned above, FlinkRunner
has separated code for batch and streaming, so make sure you use the right one.
-d in bin/flink run
Mostly I don’t submit a Flink job as standard Java application, instead bin/flink run
is used. By default it waits until job is finished. For batch, it’s OK, while in streaming mode, it’s highly suggested to add a -d
option in the CLI command. With it, it’s running in Detached mode
, and return immediately.