Run Flink Jobs on YARN

Followed by the discussion of Job Isolation on Flink and Standalone vs YARN cluster for Flink. Now I’ll show the steps to setup the environment, and run Flink jobs on YARN.

YARN installation

Install Cloudera Manager

First of all, a YARN cluster is required as the resource container.

I use Cloudera Manager to facilitate the manual work. You can refer to the documents here for more details.

# backup the core commands here to install Cloudera Manager
$ wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin

$ chmod u+x cloudera-manager-installer.bin

$ sudo ./cloudera-manager-installer.bin

Now you can open the portal in your browser to http://hostname:7180/, with username/password as admin/admin.

Setup YARN

Refer to Cloudera Manager on how to install YARN cluster.

Please note that:

  • You can mix hosts with different size in the cluster;
  • The minimal components only include HDFS and YARN;

Submit a Flink job

1. Flink cluster on YARN

In this mode, a virtual Flink cluster is created and maintained by YARN. Then you can submit jobs as a standalone one. Be aware that, jobs running in this virtual cluster are not isolated, which is natural according to Flink concepts.

1.1. create Flink Cluster on YARN

In Flink, a tool yarn-session.sh is provided to manage Flink clusters on YARN. You can run below command to create a cluster named ‘flink_yarn‘, with 5 TaskManager, each has 2048M memory and 4 slots, –one JobManager is added by default.

./flink-1.1.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn

Go to ResourceManager Web UI, you can find one application named ‘flink_yarn‘ running there.

Note: as Flink document noticed,

the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR environment variable to be set to read the YARN and HDFS configuration.

You can download a zip file contained all configuration files in Cloudera Manager UI, by following Hadoop Cluster Name -> Actions -> View Client Configuration URLs -> YARN (MR2 Included). Unzip and place in Flink client host, and export it as YARN_CONF_DIR or HADOOP_CONF_DIR, either in /etc/profile or ./flink-1.1.4/bin/config.sh.

1.2. Submit Flink job

Now you have a Flink cluster running on YARN, you can easily submit a Flink job as normal. The most important difference is, specify the JobManager by parameter -m.

To find the URL for JobManager, follow the steps as below:
1. open the link ApplicationMaster in YARN ResourceManager Web UI, it redirects to a Flink UI;
2. choose Job Manager on the left panel;
3. In the Configuration tab, the value for parameter -m is {jobmanager.rpc.address}:{jobmanager.rpc.port}

Here’s one example:

./flink-1.1.4/bin/flink run -m host:port -c main.class ./flink_job.jar [parameters]

Now the job is running.

2. Run Flink job Oven YARN directly.

Here’s one example to submit a Flink job directly on YARN:

./flink-1.2.0/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8086 -c beam.count.OpsCount -yqu data-default \
-yD taskmanager.heap.mb=4096 -yD yarn.heap-cutoff-ratio=0.6 -yD taskmanager.debug.memory.startLogThread=true -yD taskmanager.debug.memory.logIntervalMs=600000 \
-yz toratest -yst -yd ./beampoc-bundled-0.0.1-SNAPSHOT.jar --parallelism=4

Known issues

With default parameters, you may see the job fails quickly, and find similar exceptions:

Caused by: java.lang.Exception: TaskManager was lost/killed: \
ResourceID{resourceId='container_e162_1488664681401_10510_01_000003'} \
@ hdc9-phx04-0180-0102-030.*.com (dataPort=33590)

And more from JobManager/TaskManager logs:

exitStatus=Pmem limit exceeded
is running beyond physical memory limits
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

This is caused due to JVM settings in YARN container. You can refer to maillist thread http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Container-running-beyond-physical-memory-limits-when-processing-DataStream-td8188.html to understand more details. My settings are presented as above.

2 thoughts on “Run Flink Jobs on YARN

  1. Hi Ming,
    Content Presented here is very useful.
    I am trying to implement isolation through Yarn Session as per instruction, how ever when i mention parallel param >1 in flink run command, it is giving issue with duplicate data processing.Do you think i am missing something.
    I am using Flink 1.7.2, Hadoop 3.1.2, java 1.8.x
    More over, i can not see the task manager node in the Application Manager UI, do i need to configure something in yarn-site.xml to recognise task manager nodes? Any help on this would be greatly appreciated.
    Thanks,
    FlinkExplorer

  2. I see you don’t monetize your website, don’t waste your traffic,
    you can earn extra cash every month because you’ve
    got hi quality content. If you want to know how to make extra money,
    search for: Mrdalekjd methods for $$$

Comments are closed.