setup a YARN cluster with HA

Recently, I’m creating a YARN cluster for POC projects to run BeamSQL with FlinkRunner. Since it’s built on cloud VMs, HA is a critical feature to avoid single-point-failure, for both DFS NameNode and YARN ResourceManager.

The latest version that is supported in Flink is Hadoop® 2.7, so I decide to go with hadoop-2.7.4.

1. environment setup

1.1. host list

In the initial cluster, I have
* 2 hosts for DFS Namenode;
* 2 hosts for YARN ResourceManager;
* 4 hosts to act as both DFS DataNode and YARN NodeManager, –3 of them are used as JournalNode as well;
* 3 hosts to for a shared Zookeeper;

1.2. environment settings

a) create the same account and config passwordless access between each other.

It’s suggested to have different accounts, dfs for HDFS services and yarn for YARN services.

b) install JDK
Although the Hadoop wiki says that JDK7 is supported, but I see some error when starting the services which asks for JDK8. So I would install JDK8 here. –I also install JDK7 which is used in Flink.

2. Hadoop settings

2.1. /etc/profile.d/hadoop.sh

A system level profile /etc/profile.d/hadoop.sh is added to include environment parameters.

HADOOP_HOME=/home/stack/hadoop-2.7.4
export HADOOP_HOME
HADOOP_PREFIX=/home/stack/hadoop-2.7.4
export HADOOP_PREFIX
export HADOOP_CONF_DIR=/home/stack/hadoop-2.7.4/etc/hadoop
export YARN_CONF_DIR=/home/stack/hadoop-2.7.4/etc/hadoop

2.2. etc/hadoop/hadoop-env.sh

export JAVA_HOME=/home/stack/jdk1.8.0_144
export HADOOP_CONF_DIR=/home/stack/hadoop-2.7.4/etc/hadoop
export HADOOP_HEAPSIZE=2048

2.3. etc/hadoop/core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://dfs1</value>
    </property>
</configuration>

2.4. etc/hadoop/hdfs-site.xml

<configuration>
    <!-- HA -->
    <property>
      <name>dfs.nameservices</name>
      <value>dfs1</value>
    </property>
    <property>
      <name>dfs.ha.namenodes.dfs1</name>
      <value>nn1,nn2</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.dfs1.nn1</name>
      <value>dfs-nm.example.com:8020</value>
    </property>
    <property>
      <name>dfs.namenode.rpc-address.dfs1.nn2</name>
      <value>dfs-nm-ha.example.com:8020</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.dfs1.nn1</name>
      <value>dfs-nm.example.com:50070</value>
    </property>
    <property>
      <name>dfs.namenode.http-address.dfs1.nn2</name>
      <value>dfs-nm-ha.example.com:50070</value>
    </property>
    <property>
      <name>dfs.namenode.shared.edits.dir</name>
      <value>qjournal://nm1.example.com:8485;nm2.example.com:8485;nm3.example.com:8485/toradfs1</value>
    </property>
    <property>
      <name>dfs.client.failover.proxy.provider.toradfs1</name>
      <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
      <name>dfs.ha.fencing.methods</name>
      <value>shell(/bin/true)</value>
    </property>
    <property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/mnt/dfs/journal/node/local/data</value>
    </property>
    <property>
      <name>dfs.ha.automatic-failover.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>ha.zookeeper.quorum</name>
      <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
    </property>

    <!-- namenode -->
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/mnt/dfs/namenode</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.blocksize</name>
        <value>268435456</value>
    </property>
    <property>
        <name>dfs.namenode.handler.count</name>
        <value>100</value>
    </property>

    <!-- datanode -->
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/mnt/dfs/data</value>
    </property>
</configuration>

2.5. etc/hadoop/yarn-site.xml

<configuration>
    <!-- ResourceManager -->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>256</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>8192</value>
    </property>

    <!-- NodeManager -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>40960</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
    </property>
    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn/nm-local-dir</value>
    </property>
    <property>
        <name>yarn.nodemanager.log-dirs</name>
        <value>/mnt/yarn/nm-log</value>
    </property>
    <!-- health check -->
    <property>
        <name>yarn.nodemanager.health-checker.script.path</name>
        <value>/home/stack/hadoop-2.7.4/etc/hadoop/health_check.sh</value>
    </property>
    <property>
        <name>yarn.nodemanager.health-checker.interval-ms</name>
        <value>600000</value>
    </property>


    <!-- HistoryServer -->
    <!-- YARN HA -->
    <property>
      <name>yarn.resourcemanager.ha.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>yarn.resourcemanager.cluster-id</name>
      <value>yarn1</value>
    </property>
    <property>
      <name>yarn.resourcemanager.ha.rm-ids</name>
      <value>rm1,rm2</value>
    </property>
    <property>
      <name>yarn.resourcemanager.hostname.rm1</name>
      <value>yarn-rm1.example.com</value>
    </property>
    <property>
      <name>yarn.resourcemanager.hostname.rm2</name>
      <value>yarn-rm2.example.com</value>
    </property>
    <property>
      <name>yarn.resourcemanager.webapp.address.rm1</name>
      <value>yarn-rm1.example.com:8088</value>
    </property>
    <property>
      <name>yarn.resourcemanager.webapp.address.rm2</name>
      <value>yarn-rm2.example.com:8088</value>
    </property>
    <property>
      <name>yarn.resourcemanager.zk-address</name>
      <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
    </property>
</configuration>

2.6. etc/hadoop/workers

nm1.example.com
nm2.example.com
nm3.example.com
nm4.example.com

3. initialize ans start DFS/YARN cluster

3.1. create local folders

a. create directory /mnt/dfs/namenode in the two NameNodes;
b. create directory /mnt/dfs/journal in the three JouunalNode;
c. create directory /mnt/dfs/data, /mnt/yarn/nm-local-dir and /mnt/yarn/nm-log in the 4 DataNode/NodeManager;

3.2. start DFS cluster

Start DFS is a bit tricky, there’re some back-and-forth steps.
a. start journalnode in all hosts

$HADOOP_HOME/sbin/hadoop-daemon.sh start journalnode

b. format both 2 namenodes

$HADOOP_PREFIX/bin/hdfs namenode -format TORA_YARN1 

c. format zkfc in one namenode host

$HADOOP_PREFIX/bin/hdfs zkfc -formatZK

d. stop journalnode in all hosts

$HADOOP_HOME/sbin/hadoop-daemon.sh stop journalnode

e. start the cluster

$HADOOP_PREFIX/sbin/start-dfs.sh
$HADOOP_PREFIX/bin/hdfs dfsadmin -report

3.3. start YARN cluster

a. start services in ResourceManager

$HADOOP_HOME/sbin/yarn-daemon.sh --config $HADOOP_HOME/etc/hadoop start resourcemanager

b. start services in NodeManager

$HADOOP_HOME/sbin/yarn-daemon.sh --config $HADOOP_HOME/etc/hadoop start nodemanager

3.4. check cluster status

You can now go to the web UI to verify both NameNode and ResourceManager are running in HA mode. For NameNode, (active) is shown in the active NameNode, and (standby) is shown in the standby NameNode. For ResourceManager, if you open the URL for standby node, it’s redirected to active node automately.

HDFS/YARN UI:
http://yarn-rm1.example.com:8088/cluster
http://dfs-nm.example.com:50070/dfshealth.html#tab-overview
http://dfs-nm-ha.example.com:50070/dfshealth.html#tab-overview

Refer links:
1. http://hadoop.apache.org/docs/r2.7.4/hadoop-project-dist/hadoop-common/ClusterSetup.html
2. http://hadoop.apache.org/docs/r2.7.4/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
3. http://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html

Run Flink Jobs on YARN

Followed by the discussion of Job Isolation on Flink and Standalone vs YARN cluster for Flink. Now I’ll show the steps to setup the environment, and run Flink jobs on YARN.

YARN installation

Install Cloudera Manager

First of all, a YARN cluster is required as the resource container.

I use Cloudera Manager to facilitate the manual work. You can refer to the documents here for more details.

# backup the core commands here to install Cloudera Manager
$ wget http://archive.cloudera.com/cm5/installer/latest/cloudera-manager-installer.bin

$ chmod u+x cloudera-manager-installer.bin

$ sudo ./cloudera-manager-installer.bin

Now you can open the portal in your browser to http://hostname:7180/, with username/password as admin/admin.

Setup YARN

Refer to Cloudera Manager on how to install YARN cluster.

Please note that:

  • You can mix hosts with different size in the cluster;
  • The minimal components only include HDFS and YARN;

Submit a Flink job

1. Flink cluster on YARN

In this mode, a virtual Flink cluster is created and maintained by YARN. Then you can submit jobs as a standalone one. Be aware that, jobs running in this virtual cluster are not isolated, which is natural according to Flink concepts.

1.1. create Flink Cluster on YARN

In Flink, a tool yarn-session.sh is provided to manage Flink clusters on YARN. You can run below command to create a cluster named ‘flink_yarn‘, with 5 TaskManager, each has 2048M memory and 4 slots, –one JobManager is added by default.

./flink-1.1.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn

Go to ResourceManager Web UI, you can find one application named ‘flink_yarn‘ running there.

Note: as Flink document noticed,

the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR environment variable to be set to read the YARN and HDFS configuration.

You can download a zip file contained all configuration files in Cloudera Manager UI, by following Hadoop Cluster Name -> Actions -> View Client Configuration URLs -> YARN (MR2 Included). Unzip and place in Flink client host, and export it as YARN_CONF_DIR or HADOOP_CONF_DIR, either in /etc/profile or ./flink-1.1.4/bin/config.sh.

1.2. Submit Flink job

Now you have a Flink cluster running on YARN, you can easily submit a Flink job as normal. The most important difference is, specify the JobManager by parameter -m.

To find the URL for JobManager, follow the steps as below:
1. open the link ApplicationMaster in YARN ResourceManager Web UI, it redirects to a Flink UI;
2. choose Job Manager on the left panel;
3. In the Configuration tab, the value for parameter -m is {jobmanager.rpc.address}:{jobmanager.rpc.port}

Here’s one example:

./flink-1.1.4/bin/flink run -m host:port -c main.class ./flink_job.jar [parameters]

Now the job is running.

2. Run Flink job Oven YARN directly.

Here’s one example to submit a Flink job directly on YARN:

./flink-1.2.0/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8086 -c beam.count.OpsCount -yqu data-default \
-yD taskmanager.heap.mb=4096 -yD yarn.heap-cutoff-ratio=0.6 -yD taskmanager.debug.memory.startLogThread=true -yD taskmanager.debug.memory.logIntervalMs=600000 \
-yz toratest -yst -yd ./beampoc-bundled-0.0.1-SNAPSHOT.jar --parallelism=4

Known issues

With default parameters, you may see the job fails quickly, and find similar exceptions:

Caused by: java.lang.Exception: TaskManager was lost/killed: \
ResourceID{resourceId='container_e162_1488664681401_10510_01_000003'} \
@ hdc9-phx04-0180-0102-030.*.com (dataPort=33590)

And more from JobManager/TaskManager logs:

exitStatus=Pmem limit exceeded
is running beyond physical memory limits
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

This is caused due to JVM settings in YARN container. You can refer to maillist thread http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Container-running-beyond-physical-memory-limits-when-processing-DataStream-td8188.html to understand more details. My settings are presented as above.