项目中有一个需求,将结构化数据采集到 Elasticsearch 和 Nebula Graph 中。由于已经有其他项目完成了相关的 Flink Sink 组件,但该项目不想单独部署一个 Flink 集群来执行数据采集。因此,直接通过构建 StreamExecutionEnvironment
并调用 executeAsync()
方法来实现。Flink 将在本地创建相应的环境并执行任务。以下是相关的代码(文档上说这样仅供测试使用,但是我不):
看起来这段代码似乎没有问题,任务执行完毕时会释放相应的资源。然而,在进行测试时,多次执行任务后出现了 Metaspace OOM 异常。通过测试用例也复现了该异常。
为了避免 Metaspace 太小导致来不及回收,测试用例给了 2g 的 MetaspaceSize:-XX:MetaspaceSize=2g -XX:MaxMetaspaceSize=2g
。
通过 VisualVM 监控 JUnitStarter 发现 Metaspace 不断增长,并未被清理。而且加载的 Classes 数量高达 268,288!
从错误日志来看,AkkaRpcService 通过反射加载了一些类,但 Metaspace 显然不够。至于为什么不够,目前还无法确定。但可以猜测这些通过反射加载的类没有被释放。
通过刚才的报错已经拿到了 HeapDump 文件,可以使用 Eclipse 的 MAT 工具来加载 Dump 文件进行分析。
由于已经有了明确的方向,直接选择查看重复加载的类。
测试用例大约执行了 152 次,然后出现了问题。可以看到一大片 count=151
的类,证明刚才的猜测是正确的。现在需要排查这些类为什么没有被释放。首先检查是否存在强引用这些类的 GC Root。
可以看到 ApplicationShutdownHooks
中的 IdentityHashMap
存储了 MemoryExecutionGraphInfoStore
,因此它们没有被清除。这里就不详细介绍 ApplicationShutdownHooks
。然后查看与 Flink MemoryExecutionGraphInfoStore
相关的源代码。
可以看到 MiniCluster
中创建了 MemoryExecutionGraphInfoStore
,Flink 在本地环境下执行任务时会创建 MiniCluster
。
在 MiniCluster
及其相关代码中没有找到调用 MemoryExecutionGraphInfoStore
的 close()
方法,但在 JobClusterEntrypoint
的父类 ClusterEntrypoint
中有明确的调用。
虽然还不清楚为什么 MiniCluster
不会调用 MemoryExecutionGraphInfoStore
的 close()
方法,但要解决 Metaspace OOM,肯定要从这里入手。
考虑到 ExecutionGraphInfoStore
经过了多层封装,无法直接调用 close()
方法。在深思熟虑后(我直接冲),决定使用危险的反射方法来获取私有变量。
至此,重跑测试用例,Metaspace OOM 没有再现。
最后,不建议在生产环境中使用 Flink 的本地执行方法!