RAM allocation in Flink YARN

As YARN is very strict with killing containers which are using more memory than requested. It’s critical to understand how RAM is allocated when submitting a Flink job via YARN.

In Flink[1],RAM is split into three regions:

  1. Network buffers: A number of 32 KiByte buffers used by the network stack to buffer records for network transfer. Allocated on TaskManager startup. By default 2048 buffers are used, but can be adjusted via “taskmanager.network.numberOfBuffers”.
  2. Memory Manager pool: A large collection of buffers (32 KiBytes) that are used by all runtime algorithms whenever they need to buffer records. Records are stored in serialized form in those blocks. The memory manager allocates these buffers at startup.
  3. Remaining (Free) Heap: This part of the heap is left to the user code and the TaskManager’s data structures. Since those data structures are rather small, that memory is mostly available to the user code.
    Flink Memory Regions

Practically, Network buffers is set to a fixed value, 64MB by default. More importantly, it’s the balance between Memery Manager pool and Remaining (Free) Heap. The general rule is, allocate as more RAM to Memory Managed Pool as you can. To achieve that, let’s see how RAM is allocated based on the configuration elements.

Let’s dig into the detailed code:

1. JVM options in taskmanager.sh

    # if memory allocation mode is lazy and no other JVM options are set,
    # set the 'Concurrent Mark Sweep GC'
    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then
        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_TM_HEAP}" -gt "0" ]; then

        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
        TM_MAX_OFFHEAP_SIZE="8388607T"

        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

Notes:
1. if taskmanager.memory.preallocate is false(default), and no value set for env.java.opts.taskmanager, it adds GC option -XX:+UseG1GC;
2. TM_HEAP_SIZE is calculated in function calculateTaskManagerHeapSizeMB(see below for details), then the final JVM_ARGS is "${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}". That means all TM_HEAP_SIZE are allocated at the beginning, and the rest are assigned as -XX:MaxDirectMemorySize;

2. function calculateTaskManagerHeapSizeMB

# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)
calculateTaskManagerHeapSizeMB() {
    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
        echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
        exit 1
    fi

    local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
    # network buffers are always off-heap and thus need to be deduced from the heap memory size
    local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb))

    if useOffHeapMemory; then

        if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
            # We split up the total memory in heap and off-heap memory
            if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
                echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
                exit 1
            fi

            tm_heap_size_mb=$((tm_heap_size_mb - FLINK_TM_MEM_MANAGED_SIZE))
        else
            # Bash only performs integer arithmetic so floating point computation is performed using awk
            if [[ -z "${HAVE_AWK}" ]] ; then
                command -v awk >/dev/null 2>&1
                if [[ $? -ne 0 ]]; then
                    echo "[ERROR] Program 'awk' not found."
                    echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
                    exit 1
                fi
                HAVE_AWK=true
            fi

            # We calculate the memory using a fraction of the total memory
            if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
                echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
                echo "It must be between 0.0 and 1.0."
                echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
                exit 1
            fi

            # recalculate the JVM heap memory by taking the off-heap ratio into account
            local offheap_managed_memory_size=`awk "BEGIN { printf \"%.0f\n\", ${tm_heap_size_mb} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
            tm_heap_size_mb=$((tm_heap_size_mb - offheap_managed_memory_size))
        fi
    fi

    echo ${tm_heap_size_mb}
}

Note:
1. if taskmanager.memory.off-heap is false, all free memory are used in -Xms -Xmx;
2. if taskmanager.memory.off-heap is true, heapSizeMB = (totalJavaMemorySizeMB - networkBufMB) * (1.0 - taskmanager.memory.fraction);
3. networkBufMB = min(taskmanager.network.memory.max, max(taskmanager.network.memory.min, totalJavaMemorySizeMB * taskmanager.network.memory.fraction));
4. free memory or totalJavaMemorySizeMB equals to tm_total_memory – containerized.heap-cutoff in YARN mode;

In summary, these parameters are used in YARN mode:
1. -ytm,--yarntaskManagerMemory Memory per TaskManager Container [in MB];
2. taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible. If the cluster is exclusively running Flink, the total amount of available memory per machine minus some memory for the operating system (maybe 1-2 GB) is a good value. On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value.

  1. containerized.heap-cutoff-ratio: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (-Xmx argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove this fraction of the memory from the requested heap as a safety margin and add it to the memory used off-heap.
  2. containerized.heap-cutoff-min: (Default 600 MB) Minimum amount of memory to cut off the requested heap size.

  3. taskmanager.memory.size: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on taskmanager.memory.off-heap) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by taskmanager.memory.fraction. (DEFAULT: -1)

  4. taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that a task manager reserves 80% of its memory (on-heap or off-heap depending on taskmanager.memory.off-heap) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if taskmanager.memory.size is not set.
  5. taskmanager.memory.off-heap: If set to true, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).

  6. taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

  7. taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)
  8. taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

Let’s see some examples:

case 1. off-heap = false / taskmanager.memory.size = -1(default)

Input settings

-ytm 2048 
taskmanager.memory.off-heap     false
taskmanager.memory.preallocate  true
taskmanager.memory.fraction     0.1

JVM options for TaskManager container:

2018-07-02 16:45:05.762 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Maximum heap size: 1388 MiBytes
2018-07-02 16:45:05.762 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JAVA_HOME: /usr/share/jdk1.8.0_144
2018-07-02 16:45:05.763 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Hadoop version: 2.7.3
2018-07-02 16:45:05.763 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JVM Options:
2018-07-02 16:45:05.763 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xms1448m
2018-07-02 16:45:05.763 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xmx1448m

It is: -Xmx1448m = 2048 – max(2048*0.25=512, 600)

case 2. off-heap = false / taskmanager.memory.size = 256

Input settings

-ytm 2048 
taskmanager.memory.off-heap     false
taskmanager.memory.preallocate  true
taskmanager.memory.fraction     0.1
taskmanager.memory.size     256

JVM options for TaskManager container:

2018-07-02 17:34:45.339 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Maximum heap size: 1388 MiBytes
2018-07-02 17:34:45.339 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JAVA_HOME: /usr/share/jdk1.8.0_144
2018-07-02 17:34:45.339 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Hadoop version: 2.7.3
2018-07-02 17:34:45.340 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JVM Options:
2018-07-02 17:34:45.340 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xms1448m
2018-07-02 17:34:45.340 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xmx1448m

It is: -Xmx1448m = 2048 – max(2048*0.25=512, 600)

case 3. off-heap = true / taskmanager.memory.size = -1(default)

Input settings

-ytm 2048 
taskmanager.memory.fraction     0.1
taskmanager.memory.off-heap     true
taskmanager.memory.preallocate  true

JVM options for TaskManager container:

2018-07-02 16:52:19.534 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Maximum heap size: 1125 MiBytes
2018-07-02 16:52:19.534 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JAVA_HOME: /usr/share/jdk1.8.0_144
2018-07-02 16:52:19.535 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Hadoop version: 2.7.3
2018-07-02 16:52:19.535 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JVM Options:
2018-07-02 16:52:19.535 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xms1174m
2018-07-02 16:52:19.535 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xmx1174m
2018-07-02 16:52:19.535 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -XX:MaxDirectMemorySize=874m

It is:
totalJavaMemorySizeMB = 2048 – max(20480.25=512, 600) = 1448
networkBufMB = min(1024, max(64, 1448
0.1) = 144.8
-Xmx1174m ~ (1448 – 144.8) * (1.0 – 0.1) = 1172.88

-XX:MaxDirectMemorySize = 874m ~ cutoff(max(2048*0.25=512, 600)) + netBuffer(144.8) + managedMemory((1448 – 144.8) * 0.1=130.32) = 875.12

case 4. off-heap = true / taskmanager.memory.size = 256

Input settings

-ytm 2048 
taskmanager.memory.fraction     0.1
taskmanager.memory.off-heap     true
taskmanager.memory.preallocate  true
taskmanager.memory.size     256

JVM options for TaskManager container:

2018-07-02 17:37:21.247 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Maximum heap size: 1004 MiBytes
2018-07-02 17:37:21.247 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JAVA_HOME: /usr/share/jdk1.8.0_144
2018-07-02 17:37:21.248 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  Hadoop version: 2.7.3
2018-07-02 17:37:21.248 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -  JVM Options:
2018-07-02 17:37:21.248 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xms1048m
2018-07-02 17:37:21.248 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -Xmx1048m
2018-07-02 17:37:21.248 [main] INFO  org.apache.flink.yarn.YarnTaskManagerRunner  -     -XX:MaxDirectMemorySize=1000m

It is:
totalJavaMemorySizeMB = 2048 – max(20480.25=512, 600) = 1448
networkBufMB = min(1024, max(64, 1448
0.1) = 144.8
-Xmx1048m ~ (1448 – 144.8) – 256 = 1047.2

-XX:MaxDirectMemorySize = 1000 ~ cutoff(max(2048*0.25=512, 600)) + netBuffer(144.8) + managedMemory(256) = 1000.8

[1]. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

Logging configuration in Flink

  1. This post works for both Flink cluster and Flink on YARN. I don’t test it on other deploy scenarios;
  2. In Flink standalone cluster, it uses log4j.properties by default; and YARN uses logback.xml;
  3. My environment is running with Flink 1.3.2, it should be the same for 1.3.* and 1.4.* versions;

I print out every result records in the staging cluster for debugging, and one day the cluster is down due to out of disk space which is quite a surprise. Obviously I’m leading to the logging configurations of Flink, as the log files are keeping growing until they eat all the space.

By default[1], Flink is packaged with Log4j for logging. And the log4j.properties file looks like

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

org.apache.log4j.FileAppender is used that’s why no retention happens. It’s the same if you switch to logback as described in [2].

Log rolling is quite important for a stable cluster, and the configuration is easy.

1. rolling with Log4j
Log4j[3] supports rolling with org.apache.log4j.RollingFileAppender, and here is an example for your reference:

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=true
# keep up to 4 files and each file size is limited to 100KB
log4j.appender.file.MaxFileSize=100KB
log4j.appender.file.MaxBackupIndex=4
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

2. rolling with Logback
Similarly Logback[4] also supports rolling with ch.qos.logback.core.rolling.RollingFileAppender, and here is an example for your reference:

<configuration>
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.file}</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <fileNamePattern>${log.file}.%d{yyyy-MM-dd-HH}.%i</fileNamePattern>
        <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
          <maxFileSize>100KB</maxFileSize>
       <maxHistory>10</maxHistory>
       <totalSizeCap>2GB</totalSizeCap>

        </timeBasedFileNamingAndTriggeringPolicy>
    </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- This affects logging for both user code and Flink -->
    <root level="INFO">
        <appender-ref ref="file"/>
    </root>

    <!-- Uncomment this if you want to only change Flink's logging -->
    <!--<logger name="org.apache.flink" level="INFO">-->
        <!--<appender-ref ref="file"/>-->
    <!--</logger>-->

    <!-- The following lines keep the log level of common libraries/connectors on
         log level INFO. The root logger does not override this. You have to manually
         change the log levels here. -->
    <logger name="akka" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.kafka" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.hadoop" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.zookeeper" level="INFO">
        <appender-ref ref="file"/>
    </logger>

    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
    <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
        <appender-ref ref="file"/>
    </logger>
</configuration>

Here we use the Size and time based rolling policy so the total log file size is limited to 2G, with 10 days retention and each file is limited to 100KB. Please pay attention to ${log.file}.%d{yyyy-MM-dd-HH}.%i, %d{yyyy-MM-dd-HH} means that the file is rolling every hour if its size doesn’t exceed 100KB.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/logging.html
[2]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j
[3]. https://logging.apache.org/log4j/1.2/manual.html
[4]. https://logback.qos.ch/manual/appenders.html