As YARN is very strict with killing containers which are using more memory than requested. It’s critical to understand how RAM is allocated when submitting a Flink job via YARN.
In Flink[1],RAM is split into three regions:
- Network buffers: A number of 32 KiByte buffers used by the network stack to buffer records for network transfer. Allocated on TaskManager startup. By default 2048 buffers are used, but can be adjusted via “taskmanager.network.numberOfBuffers”.
- Memory Manager pool: A large collection of buffers (32 KiBytes) that are used by all runtime algorithms whenever they need to buffer records. Records are stored in serialized form in those blocks. The memory manager allocates these buffers at startup.
- Remaining (Free) Heap: This part of the heap is left to the user code and the TaskManager’s data structures. Since those data structures are rather small, that memory is mostly available to the user code.
Practically, Network buffers
is set to a fixed value, 64MB by default. More importantly, it’s the balance between Memery Manager pool
and Remaining (Free) Heap
. The general rule is, allocate as more RAM to Memory Managed Pool
as you can. To achieve that, let’s see how RAM is allocated based on the configuration elements.
Let’s dig into the detailed code:
1. JVM options in taskmanager.sh
# if memory allocation mode is lazy and no other JVM options are set, # set the 'Concurrent Mark Sweep GC' if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "${FLINK_TM_HEAP}" -gt "0" ]; then TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used TM_MAX_OFFHEAP_SIZE="8388607T" export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" fi # Add TaskManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
Notes:
1. if taskmanager.memory.preallocate
is false(default), and no value set for env.java.opts.taskmanager
, it adds GC option -XX:+UseG1GC
;
2. TM_HEAP_SIZE
is calculated in function calculateTaskManagerHeapSizeMB
(see below for details), then the final JVM_ARGS is "${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
. That means all TM_HEAP_SIZE
are allocated at the beginning, and the rest are assigned as -XX:MaxDirectMemorySize
;
2. function calculateTaskManagerHeapSizeMB
# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) calculateTaskManagerHeapSizeMB() { if [ "${FLINK_TM_HEAP}" -le "0" ]; then echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." exit 1 fi local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes # network buffers are always off-heap and thus need to be deduced from the heap memory size local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb)) if useOffHeapMemory; then if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then # We split up the total memory in heap and off-heap memory if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')." exit 1 fi tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE)) else # Bash only performs integer arithmetic so floating point computation is performed using awk if [[ -z "${HAVE_AWK}" ]] ; then command -v awk >/dev/null 2>&1 if [[ $? -ne 0 ]]; then echo "[ERROR] Program 'awk' not found." echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi HAVE_AWK=true fi # We calculate the memory using a fraction of the total memory if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value." echo "It must be between 0.0 and 1.0." echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}." exit 1 fi # recalculate the JVM heap memory by taking the off-heap ratio into account local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"` tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size)) fi fi echo ${tm_heap_size_mb} }
Note:
1. if taskmanager.memory.off-heap
is false, all free memory are used in -Xms -Xmx
;
2. if taskmanager.memory.off-heap
is true, heapSizeMB = (totalJavaMemorySizeMB - networkBufMB) * (1.0 - taskmanager.memory.fraction)
;
3. networkBufMB = min(taskmanager.network.memory.max, max(taskmanager.network.memory.min, totalJavaMemorySizeMB * taskmanager.network.memory.fraction));
4. free memory or totalJavaMemorySizeMB
equals to tm_total_memory – containerized.heap-cutoff in YARN mode;
In summary, these parameters are used in YARN mode:
1. -ytm,--yarntaskManagerMemory
Memory per TaskManager Container [in MB];
2. taskmanager.heap.mb
: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible. If the cluster is exclusively running Flink, the total amount of available memory per machine minus some memory for the operating system (maybe 1-2 GB) is a good value. On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value.
containerized.heap-cutoff-ratio: (Default 0.25)
Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (-Xmx argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove this fraction of the memory from the requested heap as a safety margin and add it to the memory used off-heap.containerized.heap-cutoff-min: (Default 600 MB)
Minimum amount of memory to cut off the requested heap size.-
taskmanager.memory.size
: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on taskmanager.memory.off-heap) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by taskmanager.memory.fraction. (DEFAULT: -1) taskmanager.memory.fraction
: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that a task manager reserves 80% of its memory (on-heap or off-heap depending on taskmanager.memory.off-heap) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if taskmanager.memory.size is not set.-
taskmanager.memory.off-heap
: If set to true, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false). -
taskmanager.network.memory.fraction
: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1) taskmanager.network.memory.min
: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)taskmanager.network.memory.max
: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)
Let’s see some examples:
case 1. off-heap = false / taskmanager.memory.size = -1(default)
Input settings
-ytm 2048 taskmanager.memory.off-heap false taskmanager.memory.preallocate true taskmanager.memory.fraction 0.1
JVM options for TaskManager container:
2018-07-02 16:45:05.762 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Maximum heap size: 1388 MiBytes 2018-07-02 16:45:05.762 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JAVA_HOME: /usr/share/jdk1.8.0_144 2018-07-02 16:45:05.763 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Hadoop version: 2.7.3 2018-07-02 16:45:05.763 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JVM Options: 2018-07-02 16:45:05.763 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xms1448m 2018-07-02 16:45:05.763 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xmx1448m
It is: -Xmx1448m = 2048 – max(2048*0.25=512, 600)
case 2. off-heap = false / taskmanager.memory.size = 256
Input settings
-ytm 2048 taskmanager.memory.off-heap false taskmanager.memory.preallocate true taskmanager.memory.fraction 0.1 taskmanager.memory.size 256
JVM options for TaskManager container:
2018-07-02 17:34:45.339 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Maximum heap size: 1388 MiBytes 2018-07-02 17:34:45.339 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JAVA_HOME: /usr/share/jdk1.8.0_144 2018-07-02 17:34:45.339 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Hadoop version: 2.7.3 2018-07-02 17:34:45.340 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JVM Options: 2018-07-02 17:34:45.340 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xms1448m 2018-07-02 17:34:45.340 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xmx1448m
It is: -Xmx1448m = 2048 – max(2048*0.25=512, 600)
case 3. off-heap = true / taskmanager.memory.size = -1(default)
Input settings
-ytm 2048 taskmanager.memory.fraction 0.1 taskmanager.memory.off-heap true taskmanager.memory.preallocate true
JVM options for TaskManager container:
2018-07-02 16:52:19.534 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Maximum heap size: 1125 MiBytes 2018-07-02 16:52:19.534 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JAVA_HOME: /usr/share/jdk1.8.0_144 2018-07-02 16:52:19.535 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Hadoop version: 2.7.3 2018-07-02 16:52:19.535 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JVM Options: 2018-07-02 16:52:19.535 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xms1174m 2018-07-02 16:52:19.535 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xmx1174m 2018-07-02 16:52:19.535 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -XX:MaxDirectMemorySize=874m
It is:
totalJavaMemorySizeMB = 2048 – max(20480.25=512, 600) = 1448
networkBufMB = min(1024, max(64, 14480.1) = 144.8
-Xmx1174m ~ (1448 – 144.8) * (1.0 – 0.1) = 1172.88
-XX:MaxDirectMemorySize = 874m ~ cutoff(max(2048*0.25=512, 600)) + netBuffer(144.8) + managedMemory((1448 – 144.8) * 0.1=130.32) = 875.12
case 4. off-heap = true / taskmanager.memory.size = 256
Input settings
-ytm 2048 taskmanager.memory.fraction 0.1 taskmanager.memory.off-heap true taskmanager.memory.preallocate true taskmanager.memory.size 256
JVM options for TaskManager container:
2018-07-02 17:37:21.247 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Maximum heap size: 1004 MiBytes 2018-07-02 17:37:21.247 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JAVA_HOME: /usr/share/jdk1.8.0_144 2018-07-02 17:37:21.248 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - Hadoop version: 2.7.3 2018-07-02 17:37:21.248 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - JVM Options: 2018-07-02 17:37:21.248 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xms1048m 2018-07-02 17:37:21.248 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -Xmx1048m 2018-07-02 17:37:21.248 [main] INFO org.apache.flink.yarn.YarnTaskManagerRunner - -XX:MaxDirectMemorySize=1000m
It is:
totalJavaMemorySizeMB = 2048 – max(20480.25=512, 600) = 1448
networkBufMB = min(1024, max(64, 14480.1) = 144.8
-Xmx1048m ~ (1448 – 144.8) – 256 = 1047.2
-XX:MaxDirectMemorySize = 1000 ~ cutoff(max(2048*0.25=512, 600)) + netBuffer(144.8) + managedMemory(256) = 1000.8
[1]. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525