Flink on k8s容器日志生成原理及与Yarn部署时的日志生成模式对比

Flink on k8s容器日志生成原理及与Yarn部署时的日志生成模式对比

最近需要将flink由原先部署到Yarn集群切换到kubernetes集群,在切换之后需要熟悉flink on k8s的运行模式。在使用过程中针对日志模块发现,在k8s的容器中,flink的系统日志只有jobmanager.log/taskmanager.log 两个,而当时在使用Yarn集群部署时,flink的日志会有多个,比如:jobmanager.log、jobmanager.err和jobmanager.out,TaskManager同理。

因此,有同事就提出为什么在k8s中部署时,只有.log一个文件,能不能类似Yarn部署时那样对日志文件进行区分。只是从容器日志来看的话,在一开始不够了解k8s的情况下,会觉得日志收集的不够准确。

因此针对上面的这个问题,就归我进行研究和解决了。网上的相关资料也比较少,因此,在本次对上面这个问题整体了解分析之后,进行一次学习记录。有遇到相关类似问题的,也可以参考这个思路。

一、认为需要修改log4j配置即可

拿到这个问题的第一步首先想到的是,既然要对日志的类别进行区分,则可以修改log4j的配置,将INFO类别和ERROR类别分别写入不同的日志文件即可。于是,先对flink路径下的conf/log4j-console.properties进行修改(flink on k8s部署时,使用的log4j配置文件是flink-console.properties文件,而不是log4j.properties)。

这里我们留下一个小疑问:为什么部署到k8s中时,使用的是log4j-console.properties,而不是部署到Yarn时的log4j.properties?有什么区别?

修改后的log4j-console.properties示例如下所示:

################################################################################
1.  Licensed to the Apache Software Foundation (ASF) under one
1.  or more contributor license agreements.  See the NOTICE file
1.  distributed with this work for additional information
1.  regarding copyright ownership.  The ASF licenses this file
1.  to you under the Apache License, Version 2.0 (the
1.  "License"); you may not use this file except in compliance
1.  with the License.  You may obtain a copy of the License at
1. 1.      http://www.apache.org/licenses/LICENSE-2.0
1. 1.  Unless required by applicable law or agreed to in writing, software
1.  distributed under the License is distributed on an "AS IS" BASIS,
1.  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1.  See the License for the specific language governing permissions and
1. limitations under the License.
################################################################################

1. Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30

1. This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
rootLogger.appenderRef.errorLogFile.ref = errorLogFile

1. Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

1. The following lines keep the log level of common libraries/connectors on
1. log level INFO. The root logger does not override this. You have to manually
1. change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

1. Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

1. Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = true
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.rolling.filter.threshold.type = LevelMatchFilter
appender.rolling.filter.threshold.level = INFO
appender.rolling.filter.threshold.onMatch = ACCEPT
appender.rolling.filter.threshold.onMisMatch = DENY

appender.errorFile.name = errorLogFile
appender.errorFile.type = RollingFile
appender.errorFile.append = true
appender.errorFile.fileName = ${sys:log.file}.err
appender.errorFile.filePattern = ${sys:log.file}.err.%i
appender.errorFile.layout.type = PatternLayout
appender.errorFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.errorFile.policies.type = Policies
appender.errorFile.policies.size.type = SizeBasedTriggeringPolicy
appender.errorFile.policies.size.size = 100MB
appender.errorFile.policies.startup.type = OnStartupTriggeringPolicy
appender.errorFile.strategy.type = DefaultRolloverStrategy
appender.errorFile.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.errorFile.filter.threshold.type = ThresholdFilter
appender.errorFile.filter.threshold.level = ERROR
appender.errorFile.filter.threshold.onMatch = ACCEPT
appender.errorFile.filter.threshold.onMisMatch = DENY

1. Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

这里相比原始文件的修改,主要集中在以下两个方面:

  • 增加RollingFileAppender的threshold参数。因为我最初希望.log日志就只显示INFO日志,而不显示其他类型日志。但是在log4j官网中介绍的threshold.level参数,其过滤的是低于设置类型的日志。比如当threshold.level=INFO时,过滤的是低于INFO类型的日志,比如DEBUG,而高于的比如ERROR类型的日志还是会保留。在查找一番资料后,发现了threshold.type = LevelMatchFilter的配置,这种配置可以使得当前appender只保留所设置的日志类型的日志,从而实现了只保留INFO日志的需求。
  • 增加了errorLogFile的appender。配置同上,使得当前appender只保留ERROR类型的日志数据。
  • 观察上面的log4j-console.properties配置可以发现,在设置文件名时,使用了一个系统变量${sys:log.file},这个系统变量使用过flink配置的应该都比较熟悉,指定本地flink日志的默认路径,比如/opt/log/jobmanager.log。

    经过测试后,使用上面的log4j配置能够实现我最初的想法,即将INFO日志和ERROR日志区分开,写入不同的文件。但是经过与Yarn部署时的文件对比发现,实际上并不能满足原始需求。

    因为在Yarn中,.log日志中也会存在ERROR日志类型的数据,似乎并不是利用log4j配置进行分开。而且我查看log4j.properties配置,也没有发现类似这种区分日志类型的配置。同时在Yarn中,.err日志输出的是任务异常信息,比如e.printStackTrace(),.out日志输出的是类似System.out.println中的数据。而log4j的配置实际上单纯的只是针对flink执行时的系统日志进行配置处理,似乎跟上面的场景还不是一样的。

    因此,就要去寻找新的思路,在摸索之后,决定从根据这个log.file的系统变量,从flink的源码入手

    二、Flink源码分析-Yarn

    在本地git clone好flink的源码后,切换到flink1.12版本分支,进行全局搜索"log.file",在flink-runtime模块下发现了BootstrapTools类,在该类下,有一个getTaskManagerShellCommand的方法,在方法中,有一处代码非常有用,如下所示:

    startCommandValues.put(
                    "redirects",
                    "1> "
                     + logDirectory
                     + "/taskmanager.out "
                     + "2> "
                     + logDirectory
                     + "/taskmanager.err"
               );
    

    可以看到,这里不就是我们最初想要生成的.out和.err文件吗!!。那么这里的redirects表示什么意思呢?

    观察后源码知道,flink设置了一个启动命令行的template模块,有一个redirects的占位符,因此上面实际上就是后续将重定向命令替换redirects占位符。

    接下来看一下这个方法在哪里被调用了,发现除了在BootstrarpToolsTest测试类中被调用外,只在flink-yarn项目下src/main/java/org/apache/flink/yarn/Utils.java类中被使用,如下所示:

    String launchCommand =
                    BootstrapTools.getTaskManagerShellCommand(
                            flinkConfig,
                            tmParams,
                            ".",
                            ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                            hasLogback,
                            hasLog4j,
                            hasKrb5,
                            taskManagerMainClass,
                            taskManagerDynamicProperties);
    
            if (log.isDebugEnabled()) {
                log.debug("Starting TaskManagers with command: " + launchCommand);
            } else {
                log.info("Starting TaskManagers");
            }
    

    因此,当部署到Yarn集群上上时,在构建TaskManager的启动命令时,会使用上述的方法。同时,上面的代码发现,当满足log.isDebugEnabled()条件时,可以打印出这个启动命令。如何能满足这个条件呢?实际上,log.isDebugEnabled()就是表示当前log4j的配置是允许打印DEBUG类型日志的,因此,我们去到flink的conf/log4j.properties下,修改rootLogger.level = INFO => rootLogger.level = DEBUG,然后再重新运行任务,即可在日志中看到这个启动命令:

    在这里插入图片描述

    可以看到,在启动命令的最后位置,有上面代码中的重定向命令,这个重定向命令将标准输出和标准错误分别重定向到了.out和.err文件。

    至此,我们就成功定位了在Yarn中为什么能够生成.err和.out日志的原因了。实际上就是由于这样的一条重定向语句,将flink任务执行时的标准输出和标准错误分别重定向到了.out和.err文件。这也解释了为什么在Yarn部署时,.err日志里显示的是异常信息,比如e.printStackTrace(),.out文件输出的是包括System.out的日志数据

    弄明白了Yarn的日志生成机制后,我们接下来去看一下k8s是怎么实现的?

    三、Flink源码分析-Kubernetes

    那么在k8s部署时,是否也有这样的重定向语句呢?为了一探究竟,仍然是分析flink 1.12版本的源码。在flink-kubernetes项目下,有一个src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java类,在该类下,存在一个getCommonStartCommand方法,该方法类似于上面getTaskManagerShellCommand方法,也是用来构造启动命令参数的,但是在该方法下,我发现就不存在这样的一条重定向语句:

    startCommandValues.put(
                    "logging",
                    getLogging(
                            logDirectory + "/" + logFileName + ".log",
                            configDirectory,
                            hasLogback,
                            hasLog4j));
    

    只有这样的一个写入.log文件的启动命令配置。同时遗憾的是,在k8s部署时,也没有类似上面Yarn那样可以在DEBUG日志类型下,打印出启动命令的语句。但是,我们仍然能做出一个初步的结论:

    flink部署在k8s上时,没有.out和.err文件,就是由于源码中在启动TaskManager/JobManager的启动命令参数中,没有将标准输出和标准错误进行重定向到指定的.out和.err文件导致的。而生成的.log文件,就是在log4j-console.properties中配置的RollingFile滚动的系统日志。

    同时我发现,在flink1.11版本时,上面的方法中还保留着跟Yarn一样的重定向语句,只是从1.12版本之后,就去掉了该重定向语句,是什么原因呢?

    至此,我们找出了flink部署到k8s中时,只有一个.log文件的根源。接下来,为了解决最初的原始问题,需要向方案去解决。

    四、设计解决方案

    首先想到的解决方案,肯定就是将Yarn那里的重定向源码复制一份到上面的k8s代码中,然后重新打包Flink再进行部署。但这种方案尝试之后发现,在用maven打包flink时会出现很多异常,比如包找不到。而且flink有180多个pom要打包,时间应该会花费非常长,在本次需求对flink源码改动要求不是很大的情况下,感觉这种调试会花费太多不必要的时间。遂舍弃改方案。

    另一个方案,就是想办法在外层,能不能在将flink打包成镜像的时候,在它原先源码中定义的启动命令参数后,再手动添加上重定向命令。为此,观察pod的yaml可以发现,容器启动的参数有args下,启动命令时执行/docker-entrypoint.sh脚本

    在这里插入图片描述

    有了这些信息后,就找到docker-entrypoint.sh的启动脚本,打开后进行分析,通过日志可以知道,脚本执行的过程中,会进入到下面的这个分支下:

    在这里插入图片描述

    其中args参数就是上面容器中的args参数,可以看到原先这个分支的最后一行是去执行exec $(drop_privs_cmd) bash -c "${args[@]}"。因此,我们就可以在这里,手动添加上标准输出和标准错误的重定向到指定文件,也相当于实现了在启动参数中加入重定向语句。

    这里我们还需要借助args参数中的-Dlog.file中显示的是jobmanager还是taskmanager来决定重定向写入的文件名是jobmanager.err还是taskmanager.err。为此使用sed命令,先获取到args中的-Dlog.file内容(即上面的参数logFilePath),然后从logFilePath中,获取到jobmanager/taskmanager的文件名(即logFileName参数)。

    然后,我们添加上重定向命令:

    exec $(drop_privs_cmd) bash -c "${args[@]} 1> /opt/flink/log/${logFileName}.out 2> /opt/flink/log/${logFileName}.err
    

    至此,我们就成功在外层flink打包成镜像时,手动在启动命令参数后添加了重定向命令,模拟了Yarn执行时的命令,来生成.err和.out文件。接下来,就是打包成镜像,然后在k8s中进行测试了。经过测试,我们发现,在/opt/log/路径下,真的生成了.out、.err和.log三个文件!!!

    在这里插入图片描述

    同时经过测试可以发现,.err、.out和.log文件分别对应了标准错误、标准输出和系统文件三部分内容。实现了跟部署在Yarn上时一样的场景,解决了我们文章最初提出的问题!!!

    然而。。却出现了问题。

    五、问题以及对k8s日志的理解

    在完成上述测试之后,当我再次点击pod,或者使用kubectl logs命令来查看日志时发现,日志里竟然只有启动脚本的一些日志,而flink执行的系统日志都没有了!!

    没办法,只能再去分析原因了。在kubernetes的官网中,在日志架构章节中,赫然写着如下一段话:

    容器运行时对写入到容器化应用程序的 stdoutstderr 流的所有输出进行处理和转发。 不同的容器运行时以不同的方式实现这一点;不过它们与 kubelet 的集成都被标准化为 CRI 日志格式。

    默认情况下,如果容器重新启动,kubelet 会保留一个终止的容器及其日志。 如果一个 Pod 被逐出节点,所对应的所有容器及其日志也会被逐出。

    kubelet 通过 Kubernetes API 的特殊功能将日志提供给客户端访问。 访问这个日志的常用方法是运行 kubectl logs

    虽然我现在对k8s的理解也不够,但看上面这段话让我意识到,容器的日志收集或许也是通过监听stdout和stderr来生成的。。。 而由于我上面使用重定向命令,将标准输出和标准错误都重定向到了指定的文件中,导致stdout和stderr无法监听到日志数据,所以容器内的日志就获取不到了。

    或者说,利用上面将标准输出和标准错误重定向写入指定指定文件的方式,是相当于将原先容器里的日志,分别根据日志类型映射到了.err、.out和.log日志文件下来展示。

    那这样分析下来,我发现,flink之所以在1.12版本之后将重定向命令从源码中去掉,可能为的就是利用k8s的日志聚合,将stdout和stderr都写入容器日志中,方便后续对容器日志的监控和分析等操作。

    嗯。。此时,感觉上面最开始的分析都白费了,因为本身容器的日志实际上就已经包含了所有日志数据了,根本不用再做.out和.err的区分了

    这里插一句,还记得文章在第一部分提出的问题吗?这里,大家再思考另一个问题,就是讲到这里,我们知道容器会对stdout和stderr流进行处理和转发。stderr包含flink任务执行时的异常信息,stdout包含任务执行时的标准输出信息,那么flink执行时的系统日志比如INFO、ERROR日志数据,容器时从哪里获取到的呢?log4j中配置的RollingFile类型的appender可不属于标准输出。

    那么这个问题的答案,也就是flink提交到k8s部署时,为什么使用的是log4j-console.properties配置的原因了。

    因为在log4j-console.properties中,会有一个ConsoleAppender的配置,将flink的系统日志打印到CONSOLE(System.out),所以相当于将系统日志打印到了标准输出,然后容器再通过监听stdout从而获取到系统日志。

    而部署到Yarn时,使用的log4j.properties的配置中,就可以看到并没有ConsoleAppender的配置,所以它的系统日志全部打印到了.log文件中。

    解决了这个问题,再说回之前的分析。我们上面添加的重定向操作,相当于是模仿着Yarn上部署的方式,将原先容器里的日志,分别根据日志类型映射到了.err、.out和.log日志文件下来展示。但是此时容器中的日志却丢失了,可能会对后续我们最容器上的日志采集和分析有影响。

    那有没有什么解决方案呢?

    双写。尝试在将标准输出和标准错误重定向到指定文件时,同时重定向到stdout和stderr。为此,我们进行了测试,也就是docker-entrypoint.sh中的下面这行代码:

    exec $(drop_privs_cmd) bash -c "${args[@]} 1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout) 2> >(tee /opt/flink/log/${logFileName}.err >/dev/stderr)"
    

    命令中的1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout)表示将标准输出重定向到一个匿名的管道中,并将管道中的内容通过tee命令同时输出到文件/opt/flink/log/${logFileName}.out和标准输出设备中。

    经过测试,可以实现上面的功能,即既有.out和.err文件,同时,容器日志也恢复最初的状态。

    但是需要说明一点的是,由于log4j-console.properties配置把系统日志也作为标准输出的一部分,因此生成的.out文件中实际上包含了任务中System.out的输出和系统文件两部分内容。而.err文件则只包含了标准错误的日志内容。

    至此,实现的日志效果是:

    • 容器日志:包含系统日志、标准输出、标准错误
    • .out日志:包含系统日志、标准输出
    • .err日志:包含标准错误

    以上就是本次,针对最初提出的k8s日志问题,进行的一次深入探究和思考。在研究过程中,对log4j的日志配置也有了更深入的理解,由于一开始对容器和k8s技术的不了解,导致最后似乎实现的结果不理想,但技术不就是不断探究的过程吗!

    关于上面的问题,如果有遇到类似的也欢迎找我探讨,感谢阅读!