Logging configuration in Flink

  1. This post works for both Flink cluster and Flink on YARN. I don’t test it on other deploy scenarios;
  2. In Flink standalone cluster, it uses log4j.properties by default; and YARN uses logback.xml;
  3. My environment is running with Flink 1.3.2, it should be the same for 1.3.* and 1.4.* versions;

I print out every result records in the staging cluster for debugging, and one day the cluster is down due to out of disk space which is quite a surprise. Obviously I’m leading to the logging configurations of Flink, as the log files are keeping growing until they eat all the space.

By default[1], Flink is packaged with Log4j for logging. And the log4j.properties file looks like

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

org.apache.log4j.FileAppender is used that’s why no retention happens. It’s the same if you switch to logback as described in [2].

Log rolling is quite important for a stable cluster, and the configuration is easy.

1. rolling with Log4j
Log4j[3] supports rolling with org.apache.log4j.RollingFileAppender, and here is an example for your reference:

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=true
# keep up to 4 files and each file size is limited to 100KB
log4j.appender.file.MaxFileSize=100KB
log4j.appender.file.MaxBackupIndex=4
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

2. rolling with Logback
Similarly Logback[4] also supports rolling with ch.qos.logback.core.rolling.RollingFileAppender, and here is an example for your reference:

<configuration>
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.file}</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <fileNamePattern>${log.file}.%d{yyyy-MM-dd-HH}.%i</fileNamePattern>
        <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
          <maxFileSize>100KB</maxFileSize>
       <maxHistory>10</maxHistory>
       <totalSizeCap>2GB</totalSizeCap>

        </timeBasedFileNamingAndTriggeringPolicy>
    </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- This affects logging for both user code and Flink -->
    <root level="INFO">
        <appender-ref ref="file"/>
    </root>

    <!-- Uncomment this if you want to only change Flink's logging -->
    <!--<logger name="org.apache.flink" level="INFO">-->
        <!--<appender-ref ref="file"/>-->
    <!--</logger>-->

    <!-- The following lines keep the log level of common libraries/connectors on
         log level INFO. The root logger does not override this. You have to manually
         change the log levels here. -->
    <logger name="akka" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.kafka" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.hadoop" level="INFO">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.zookeeper" level="INFO">
        <appender-ref ref="file"/>
    </logger>

    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
    <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
        <appender-ref ref="file"/>
    </logger>
</configuration>

Here we use the Size and time based rolling policy so the total log file size is limited to 2G, with 10 days retention and each file is limited to 100KB. Please pay attention to ${log.file}.%d{yyyy-MM-dd-HH}.%i, %d{yyyy-MM-dd-HH} means that the file is rolling every hour if its size doesn’t exceed 100KB.

[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/logging.html
[2]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j
[3]. https://logging.apache.org/log4j/1.2/manual.html
[4]. https://logback.qos.ch/manual/appenders.html

The back-end of FlinkRunner in Apache Beam

Table of Content

1. Beam SDK-Runner model

The Beam SDKs provide a unified programming model that can define your data processing pipeline, regardless how it’s actually processed. The Beam Runners translate the data processing pipeline into the API compatible with the distributed processing back-end of your choice.

To make is simple, Beam SDK defines a context-free data processing model. When it goes to an actual execution engine, like Flink, Spark, Storm, event MapReduce, the Runner is responsible to do the translate work, to re-express the data pipeline with its own API.

2. PipelineRunner interface

PipelineRunner is the parent class for any runner. It contains a core function run as below, which is responsible to translate a Pipeline in Beam SDK to runner API.

  /**
   * Processes the given Pipeline, returning the results.
   */
  public abstract ResultT run(Pipeline pipeline);

3. FlinkRunner

Apache Flink is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.

FlinkRunner is the one to run a Beam Pipeline on a Flink cluster.

3.1. FlinkPipelineExecutionEnvironment

In Beam SDK, there’s no difference for both bounded and unbounded data API, which is not the same in Flink. FlinkPipelineExecutionEnvironment hides the difference, inside of it, there’s org.apache.flink.api.java.ExecutionEnvironment for bounded data, aka batch process; and org.apache.flink.streaming.api.environment.StreamExecutionEnvironment for unbounded data, aka streaming process.

3.2. Translate Pipeline to Flink DataStream

Now let’s go to the core function public PipelineResult run(Pipeline pipeline) {. The main logic is expressed with below lines:

FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

env.translate(this, pipeline);

JobExecutionResult result = env.executePipeline();

...

public void translate(FlinkRunner flinkRunner, Pipeline pipeline)

The first step is to detect the mode of pipeline, whether it’s STREAMING or BATCH. It’s quite straight-forward with below lines

TranslationMode translationMode = optimizer.getTranslationMode();
...
if (options.isStreaming()) {
  return TranslationMode.STREAMING;
}

If it’s TranslationMode.STREAMING, a FlinkStreamingPipelineTranslator is created, otherwise FlinkBatchPipelineTranslator for TranslationMode.BATCH mode.

    if (translationMode == TranslationMode.STREAMING) {
      this.flinkStreamEnv = createStreamExecutionEnvironment();
      translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
    } else {
      this.flinkBatchEnv = createBatchExecutionEnvironment();
      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
    }

The details to setup environment from FlinkPipelineOptions will be described later, let’s focus on the pipeline translator first.

Take TranslationMode.STREAMING mode for example. FlinkStreamingPipelineTranslator extends Pipeline.PipelineVisitor.Defaults, and leverage the traverseTopologically to perform the detailed transform work. The PipelineVisitor is very important to understand the implementation of Runner, will update another post to talk it later soon.

submit to run

Now a pipeline job expressed with Flink DataStream is ready, it can be submitted to run with method flinkEnv.execute(jobName);

3.3. usage of FlinkPipelineOptions

All pipeline options can be found in JDK API FlinkPipelineOptions. I’ll highlight several ones that are related with the implementation of runner itself.

–streaming

As mentioned above, FlinkRunner has separated code for batch and streaming, so make sure you use the right one.

-d in bin/flink run

Mostly I don’t submit a Flink job as standard Java application, instead bin/flink run is used. By default it waits until job is finished. For batch, it’s OK, while in streaming mode, it’s highly suggested to add a -d option in the CLI command. With it, it’s running in Detached mode, and return immediately.