- This post works for both Flink cluster and Flink on YARN. I don’t test it on other deploy scenarios;
- In Flink standalone cluster, it uses log4j.properties by default; and YARN uses logback.xml;
- My environment is running with Flink 1.3.2, it should be the same for 1.3.* and 1.4.* versions;
I print out every result records in the staging cluster for debugging, and one day the cluster is down due to out of disk space which is quite a surprise. Obviously I’m leading to the logging configurations of Flink, as the log files are keeping growing until they eat all the space.
By default[1], Flink is packaged with Log4j for logging. And the log4j.properties
file looks like
# This affects logging for both user code and Flink log4j.rootLogger=INFO, file # Uncomment this if you want to _only_ change Flink's logging #log4j.logger.org.apache.flink=INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
org.apache.log4j.FileAppender
is used that’s why no retention happens. It’s the same if you switch to logback as described in [2].
Log rolling is quite important for a stable cluster, and the configuration is easy.
1. rolling with Log4j
Log4j[3] supports rolling with org.apache.log4j.RollingFileAppender
, and here is an example for your reference:
# This affects logging for both user code and Flink log4j.rootLogger=INFO, file # Uncomment this if you want to _only_ change Flink's logging #log4j.logger.org.apache.flink=INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=true # keep up to 4 files and each file size is limited to 100KB log4j.appender.file.MaxFileSize=100KB log4j.appender.file.MaxBackupIndex=4 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
2. rolling with Logback
Similarly Logback[4] also supports rolling with ch.qos.logback.core.rolling.RollingFileAppender
, and here is an example for your reference:
<configuration> <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.file}</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.file}.%d{yyyy-MM-dd-HH}.%i</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100KB</maxFileSize> <maxHistory>10</maxHistory> <totalSizeCap>2GB</totalSizeCap> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> </encoder> </appender> <!-- This affects logging for both user code and Flink --> <root level="INFO"> <appender-ref ref="file"/> </root> <!-- Uncomment this if you want to only change Flink's logging --> <!--<logger name="org.apache.flink" level="INFO">--> <!--<appender-ref ref="file"/>--> <!--</logger>--> <!-- The following lines keep the log level of common libraries/connectors on log level INFO. The root logger does not override this. You have to manually change the log levels here. --> <logger name="akka" level="INFO"> <appender-ref ref="file"/> </logger> <logger name="org.apache.kafka" level="INFO"> <appender-ref ref="file"/> </logger> <logger name="org.apache.hadoop" level="INFO"> <appender-ref ref="file"/> </logger> <logger name="org.apache.zookeeper" level="INFO"> <appender-ref ref="file"/> </logger> <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler --> <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"> <appender-ref ref="file"/> </logger> </configuration>
Here we use the Size and time based rolling policy so the total log file size is limited to 2G, with 10 days retention and each file is limited to 100KB. Please pay attention to ${log.file}.%d{yyyy-MM-dd-HH}.%i
, %d{yyyy-MM-dd-HH} means that the file is rolling every hour if its size doesn’t exceed 100KB.
[1]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/logging.html
[2]. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j
[3]. https://logging.apache.org/log4j/1.2/manual.html
[4]. https://logback.qos.ch/manual/appenders.html
Very useful examples, thank you!