Run Flink Jobs on YARN

Followed by the discussion of Job Isolation on Flink and Standalone vs YARN cluster for Flink. Now I’ll show the steps to setup the environment, and run Flink jobs on YARN.

YARN installation

Install Cloudera Manager

First of all, a YARN cluster is required as the resource container.

I use Cloudera Manager to facilitate the manual work. You can refer to the documents here for more details.

# backup the core commands here to install Cloudera Manager
$ wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin

$ chmod u+x cloudera-manager-installer.bin

$ sudo ./cloudera-manager-installer.bin

Now you can open the portal in your browser to http://hostname:7180/, with username/password as admin/admin.

Setup YARN

Refer to Cloudera Manager on how to install YARN cluster.

Please note that:

  • You can mix hosts with different size in the cluster;
  • The minimal components only include HDFS and YARN;

Submit a Flink job

1. Flink cluster on YARN

In this mode, a virtual Flink cluster is created and maintained by YARN. Then you can submit jobs as a standalone one. Be aware that, jobs running in this virtual cluster are not isolated, which is natural according to Flink concepts.

1.1. create Flink Cluster on YARN

In Flink, a tool yarn-session.sh is provided to manage Flink clusters on YARN. You can run below command to create a cluster named ‘flink_yarn‘, with 5 TaskManager, each has 2048M memory and 4 slots, –one JobManager is added by default.

./flink-1.1.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn

Go to ResourceManager Web UI, you can find one application named ‘flink_yarn‘ running there.

Note: as Flink document noticed,

the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR environment variable to be set to read the YARN and HDFS configuration.

You can download a zip file contained all configuration files in Cloudera Manager UI, by following Hadoop Cluster Name -> Actions -> View Client Configuration URLs -> YARN (MR2 Included). Unzip and place in Flink client host, and export it as YARN_CONF_DIR or HADOOP_CONF_DIR, either in /etc/profile or ./flink-1.1.4/bin/config.sh.

1.2. Submit Flink job

Now you have a Flink cluster running on YARN, you can easily submit a Flink job as normal. The most important difference is, specify the JobManager by parameter -m.

To find the URL for JobManager, follow the steps as below:
1. open the link ApplicationMaster in YARN ResourceManager Web UI, it redirects to a Flink UI;
2. choose Job Manager on the left panel;
3. In the Configuration tab, the value for parameter -m is {jobmanager.rpc.address}:{jobmanager.rpc.port}

Here’s one example:

./flink-1.1.4/bin/flink run -m host:port -c main.class ./flink_job.jar [parameters]

Now the job is running.

2. Run Flink job Oven YARN directly.

Here’s one example to submit a Flink job directly on YARN:

./flink-1.2.0/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8086 -c beam.count.OpsCount -yqu data-default \
-yD taskmanager.heap.mb=4096 -yD yarn.heap-cutoff-ratio=0.6 -yD taskmanager.debug.memory.startLogThread=true -yD taskmanager.debug.memory.logIntervalMs=600000 \
-yz toratest -yst -yd ./beampoc-bundled-0.0.1-SNAPSHOT.jar --parallelism=4

Known issues

With default parameters, you may see the job fails quickly, and find similar exceptions:

Caused by: java.lang.Exception: TaskManager was lost/killed: \
ResourceID{resourceId='container_e162_1488664681401_10510_01_000003'} \
@ hdc9-phx04-0180-0102-030.*.com (dataPort=33590)

And more from JobManager/TaskManager logs:

exitStatus=Pmem limit exceeded
is running beyond physical memory limits
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

This is caused due to JVM settings in YARN container. You can refer to maillist thread http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Container-running-beyond-physical-memory-limits-when-processing-DataStream-td8188.html to understand more details. My settings are presented as above.

Description of Interfaces in DoFn

DoFn is the core API in Apache Beam. The function to use to process each element is specified by a DoFn, primarily via its ProcessElement method.

According to JavaDoc:

Conceptually, when a ParDo transform is executed, the elements of the input PCollection are first divided up into some number of “bundles”. These are farmed off to distributed worker machines (or run locally, if using the DirectRunner). For each bundle of input elements processing proceeds as follows:

  1. If required, a fresh instance of the argument DoFn is created on a worker, and the DoFn.Setup method is called on this instance. This may be through deserialization or other means. A PipelineRunner may reuse DoFn instances for multiple bundles. A DoFn that has terminated abnormally (by throwing an Exception) will never be reused.
  2. The DoFn’s DoFn.StartBundle method, if provided, is called to initialize it.
  3. The DoFn’s DoFn.ProcessElement method is called on each of the input elements in the bundle.
  4. The DoFn’s DoFn.FinishBundle method, if provided, is called to complete its work. After DoFn.FinishBundle is called, the framework will not again invoke DoFn.ProcessElement or DoFn.FinishBundle until a new call to DoFn.StartBundle has occurred.
  5. If any of DoFn.Setup, DoFn.StartBundle, DoFn.ProcessElement or DoFn.FinishBundle methods throw an exception, the DoFn.Teardown method, if provided, will be called on the DoFn instance.
  6. If a runner will no longer use a DoFn, the DoFn.Teardown method, if provided, will be called on the discarded instance.

Each of the calls to any of the DoFn’s processing methods can produce zero or more output elements. All of the of output elements from all of the DoFn instances are included in the output PCollection.

Here’s one example implementation:

public class MapBuyerSellerLkpFn extends DoFn<MapCheckout, MapCheckout> {

  /**
   * 
   */
  private static final long serialVersionUID = -451161312788562675L;  

  @Setup
  public void setup(){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "setup");
  }
  @Teardown
  public void close(){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "close");
  }

  @StartBundle
  public void startBundle(Context c){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "startBundle");
  }
  @FinishBundle
  public void finishBundle(Context c){
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "finishBundle");
  }  

  @Override
  public void prepareForProcessing() {
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "prepareForProcessing");    
  }

  @ProcessElement
    public void processElement(ProcessContext c) {
    BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "processElement");
  }
}

I add a tracker BeamComponentTracker to count how many times each function is invoked, which is shown as below to verify the back-end execution chain.

MapBuyerSellerLkpFn":{"prepareForProcessing":10,"setup":1,"startBundle":10,"finishBundle":10,"processElement":10}
MapBuyerSellerLkpFn":{"prepareForProcessing":15,"setup":1,"startBundle":15,"finishBundle":15,"processElement":15}
MapBuyerSellerLkpFn":{"prepareForProcessing":20,"setup":1,"startBundle":20,"finishBundle":20,"processElement":20}
MapBuyerSellerLkpFn":{"prepareForProcessing":27,"setup":1,"startBundle":27,"finishBundle":27,"processElement":27}
MapBuyerSellerLkpFn":{"prepareForProcessing":35,"setup":1,"startBundle":35,"finishBundle":35,"processElement":35}
MapBuyerSellerLkpFn":{"prepareForProcessing":46,"setup":1,"startBundle":46,"finishBundle":46,"processElement":46}