Standalone vs YARN cluster for Flink

Flink offers two options to setup a cluster, one is standalone cluster, and the other is based on YARN.

Here I’ll list the pros/cons, to do a comparison.

Standalone mode

pros

  • no dependency on external components;
  • easy to add/remove TaskManager in the cluster;
  • easy for debug, and log retrieve;

cons

  • No job isolation as slots share the same JVM, refer to Job Isolation on Flink;
  • Need to have a zookeeper for node failure recovery;

YARN mode

More specifically, you have two choices with YARN, see yarn setup

  • set up Flink session, similar as a virtual cluster;
  • run Flink job directly on YARN

pros

  • job isolation provided by YARN;
  • node failure auto-recovery;
  • flexible resource capacity per TaskManager for different jobs;

cons

  • external cost for YARN;
  • So far YARN is tied closed with a distribution file system, HDFS/AWS/GoogleCloud;

In our environment, we decide to go with YARN finally. As we value the isolation feature much more than others, to support multiple tenants.

Job Isolation on Flink

Recently I’m doing some test on Flink, and find the parallelism mechanism is quite different.

Refer to the documents of Flink:

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster.

taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances.

taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512).

Based on these information, we can conclude that, all the slots in one TaskManager share the same JVM, without CPU/RAM isolation. In production, this will be a BIG issue.

The best way to completely isolate jobs against each other (CPU, network, etc) is to run them in different processes. The best way to do that is to start a Flink cluster per job.

Currently, I’ve find two solutions to avoid it.

1 Resource container, like YARN, MESOS

Mesos support from version 1.2, see https://issues.apache.org/jira/browse/FLINK-1984.

2 Run multiple TaskManager instances per machine, and give each only one slot

In a stand-alone cluster, it’s easy to start more than one TaskManager in one physical host, by duplicate the hostname in conf/slaves, for example:

$ cat flink-1.1.4/conf/flink-conf.yaml
#...
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
parallelism.default: 1

#...
taskmanager.numberOfTaskSlots: 1
#it starts 3 TaskManager in each host
$ cat flink-1.1.4/conf/slaves
flink-1-1.xxx.com
flink-1-1.xxx.com
flink-1-1.xxx.com
flink-1-2.xxx.com
flink-1-2.xxx.com
flink-1-2.xxx.com