Job Isolation on Flink

Recently I’m doing some test on Flink, and find the parallelism mechanism is quite different.

Refer to the documents of Flink:

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster.

taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances.

taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512).

Based on these information, we can conclude that, all the slots in one TaskManager share the same JVM, without CPU/RAM isolation. In production, this will be a BIG issue.

The best way to completely isolate jobs against each other (CPU, network, etc) is to run them in different processes. The best way to do that is to start a Flink cluster per job.

Currently, I’ve find two solutions to avoid it.

1 Resource container, like YARN, MESOS

Mesos support from version 1.2, see https://issues.apache.org/jira/browse/FLINK-1984.

2 Run multiple TaskManager instances per machine, and give each only one slot

In a stand-alone cluster, it’s easy to start more than one TaskManager in one physical host, by duplicate the hostname in conf/slaves, for example:

$ cat flink-1.1.4/conf/flink-conf.yaml
#...
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
parallelism.default: 1

#...
taskmanager.numberOfTaskSlots: 1
#it starts 3 TaskManager in each host
$ cat flink-1.1.4/conf/slaves
flink-1-1.xxx.com
flink-1-1.xxx.com
flink-1-1.xxx.com
flink-1-2.xxx.com
flink-1-2.xxx.com
flink-1-2.xxx.com

3 thoughts on “Job Isolation on Flink

  1. i am comparing flink and spark resources utilization by using java mission control which tell me the resources consumptions like cpu and memory for running applications.
    In case of Spark when i run my cluster there starts one worker java process with separate JVM and when i run spark application there is separate java process with different PID. While in case of Flink, flink taskmanager and flink java application runs in same JVM process . How can i justify that how flink application process is taking how much resources?

Comments are closed.