Recently I’m doing some test on Flink, and find the parallelism mechanism is quite different.
Refer to the documents of Flink:
Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.
Each Flink TaskManager provides processing slots in the cluster.
taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances.
taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512).
Based on these information, we can conclude that, all the slots in one TaskManager share the same JVM, without CPU/RAM isolation. In production, this will be a BIG issue.
The best way to completely isolate jobs against each other (CPU, network, etc) is to run them in different processes. The best way to do that is to start a Flink cluster per job.
Currently, I’ve find two solutions to avoid it.
1 Resource container, like YARN, MESOS
Mesos support from version 1.2, see https://issues.apache.org/jira/browse/FLINK-1984.
2 Run multiple TaskManager instances per machine, and give each only one slot
In a stand-alone cluster, it’s easy to start more than one TaskManager in one physical host, by duplicate the hostname in conf/slaves, for example:
$ cat flink-1.1.4/conf/flink-conf.yaml #... # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. parallelism.default: 1 #...
taskmanager.numberOfTaskSlots: 1 #it starts 3 TaskManager in each host $ cat flink-1.1.4/conf/slaves flink-1-1.xxx.com flink-1-1.xxx.com flink-1-1.xxx.com flink-1-2.xxx.com flink-1-2.xxx.com flink-1-2.xxx.com