1.简介
大家都知道,Flink 是一个有状态的分布式流式计算引擎,flink 中的每个function或者是operator都可以是有状态的,为了使得状态可以容错,flink引入了checkpoint机制。checkpoint使得flink能够恢复作业的状态和位置,从而为作业提供与无故障执行相同的语义。
CheckPoint的触发和状态数据管理主要由JobManager负责,JobManager需要将checkpoint生成的状态信息文件保存在外部Storage中,在基于On Yarn的部署模式下,这个Storage通常是HDFS,在On Kubernetes的模式下,这个Storage通常是PV,这样可以使Flink On K8s减少对其他第三方组件的依赖,比如Hadoopp的依赖。
2.Checkpoint 程序开发
为了演示在K8S 上运行flink checkpoint程序,新开发了一个checkpoint演示程序,这个程序实现的是WordCount单词统计功能。
在这个示例中,上游的Source使用Kafka,由Kafka的生产者程序往对应的topic发送字符流信息,在实际生成环境中,Flink的Source通常也是Kafka。
Flink Checkpoint程序读取Kafka Topic的字符流,然后进行分词和单词统计,并将统计结果Sink输出到MySQL数据库中。
为了确保即使程序终止重启后,可以在原有的单词统计结果上累计单词的统计结果,例如在Flink程序重启前,我们统计Hello的单词出现了5次,在重启程序继续消费Kafka的字符流数据时,当首次遇到包含Hello单词的字符流时,此时Hello的统计结果是6,而不是1。为此,我们在程序里开启checkpoint功能,这样JobManager就能定期周期性的触发checkpoint操作,将当前的统计结果,也就是checkpoint状态信息保存起来。
1、flink 主程序
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.log4j.Logger; import java.util.Arrays; import java.util.Properties; public class StreamWordCountWithCP { private static Logger logger = Logger.getLogger(StreamWordCountWithCP.class); public static void main(String[] args) throws Exception { logger.info("******************* StreamWordCountWithCP job start *******************"); // kafka server String kafkaServer = "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"; // kafka topic String kafkaTopic = "flink_test"; // mysql数据库ip String dbHost = "172.16.252.113"; // mysql数据库端口 String dbPort = "3306"; // 数据库名称 String dbName = "flink_test"; // 结果表 String table = "wc"; // checkpoint文件保存路径 String checkpointPath = "file:///Users/willwang/flink-on-k8s-demo/checkpoint"; // checkpoint保存时间间隔,默认10s long interval = 10000; // 并行度 int parallelism = 1; // 从程序传参中获取参数 if (args != null && args.length == 9) { kafkaServer = args[0]; kafkaTopic = args[1]; dbHost = args[2]; dbPort = args[3]; dbName = args[4]; table = args[5]; checkpointPath = args[6]; interval = Long.parseLong(args[7]); parallelism = Integer.parseInt(args[8]); logger.info("******************* kafkaServer=" + args[0] + ", " + "kafkaTopic=" + args[1] + ", " + "dbHost=" + args[2] + ", " + "dbPort=" + args[3] + ", " + "dbName=" + args[4] + ", " + "table=" + args[5] + ", " + "checkpointPath=" + args[6] + ", " + "interval=" + args[7] + ", " + "parallelism=" + args[8]); } // 0. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 1. 配置checkpoint env.enableCheckpointing(interval); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.enableUnalignedCheckpoints(); checkpointConfig.setCheckpointStorage(checkpointPath); // 2. 配置Kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaServer); properties.setProperty("group.id", "wc-consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 3. 设置kafka source DataStreamSource stream = env.addSource(new FlinkKafkaConsumer( kafkaTopic, new SimpleStringSchema(), properties )); // 4. 转换数据格式 SingleOutputStreamOperator wordAndOne = stream .flatMap((String line, Collector words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }) .returns(Types.STRING) .map(word -> Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)); // 5. 分组 KeyedStream wordAndOneKS = wordAndOne .keyBy(t -> t.f0); // 6. 求和 SingleOutputStreamOperator result = wordAndOneKS .sum(1); // 7. 设置自定义sink,结果输出到MySQL result.addSink(new MySQLSink(dbHost, dbPort, dbName, table)); env.execute("StreamWordCountWithCP"); } }
2、sink mysql工具类
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.log4j.Logger; import java.sql.*; public class MySQLSink extends RichSinkFunction { private static Logger logger = Logger.getLogger(MySQLSink.class); private Connection conn; private PreparedStatement insertPS; private PreparedStatement updatePS; private String dbHost; private String dbPort; private String dbName; private String table; public MySQLSink(String dbHost, String dbPort, String dbName, String table) { this.dbHost = dbHost; this.dbPort = dbPort; this.dbName = dbName; this.table = table; } @Override public void open(Configuration parameters) throws Exception { String url = "jdbc:mysql://"+dbHost+":"+dbPort+"/" + dbName + "?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"; //建立mysql连接 conn = DriverManager.getConnection(url, "root", "111...aaa"); String sql = "INSERT INTO "+table+" (`word`, `cnt`) VALUES(?,?);"; insertPS = conn.prepareStatement(sql); String sql2 = "UPDATE "+table+" set cnt = ? where word = ?;"; updatePS = conn.prepareStatement(sql2); System.out.println("自定义sink,open数据库链接 ====="); logger.info("*** 自定义sink,open数据库链接 ====="); } @Override public void close() throws Exception { if(insertPS != null){ insertPS.close(); } if(updatePS != null){ updatePS.close(); } if(conn != null){ conn.close(); } System.out.println("自定义sink,close数据库连接 ====="); logger.info("*** 自定义sink,close数据库连接 ====="); } @Override public void invoke(Tuple2 value, Context context) throws Exception { String sql = "select count(*) from "+table+" where word = '" + value.f0 + "'"; System.out.println(sql); logger.info("*** " + sql); Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(sql); long cnt = 0; while (resultSet.next()) { cnt = resultSet.getLong(1); } resultSet.close(); statement.close(); if (cnt > 0) { System.out.println("update value=" + value); updatePS.setLong(1,value.f1); updatePS.setString(2,value.f0); updatePS.executeUpdate(); } else { System.out.println("insert value=" + value); insertPS.setString(1,value.f0); insertPS.setLong(2,value.f1); insertPS.executeUpdate(); } } }
3.K8S 测试配置
1、创建checkpoint pvc 存储checkpoint的数据
[root@k8s-demo001 ~]# cat flink-checkpoint-application-pvc.yaml # Flink checkpoint 持久化存储pvc apiVersion: v1 kind: PersistentVolumeClaim metadata: name: flink-checkpoint-application-pvc # checkpoint pvc名称 namespace: flink # 指定归属的名命空间 spec: storageClassName: nfs-storage #sc名称,更改为实际的sc名称 accessModes: - ReadWriteMany #采用ReadWriteMany的访问模式 resources: requests: storage: 1Gi #存储容量,根据实际需要更改
2、通过application 模式进行测试
[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: namespace: flink name: application-deployment-checkpoint # flink 集群名称 spec: image: flink:1.13.6 # flink基础镜像 flinkVersion: v1_13 # flink版本,选择1.13 imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取 ingress: # ingress配置,用于访问flink web页面 template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: "nginx" annotations: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: file:///opt/flink/checkpoints serviceAccount: flink jobManager: resource: memory: "1024m" cpu: 1 taskManager: resource: memory: "1024m" cpu: 1 podTemplate: spec: containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai volumeMounts: - name: flink-jar # 挂载nfs上的jar mountPath: /opt/flink/jar - name: flink-checkpoints # 挂载checkpoint pvc mountPath: /opt/flink/checkpoints volumes: - name: flink-jar persistentVolumeClaim: claimName: flink-jar-pvc - name: flink-checkpoints persistentVolumeClaim: claimName: flink-checkpoint-application-pvc job: jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包 entryClass: org.fblinux.StreamWordCountWithCP args: # 传递到作业main方法的参数 - "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092" - "flink_test" - "172.16.252.113" - "3306" - "flink_test" - "wc" - "file:///opt/flink/checkpoints" - "10000" - "1" parallelism: 1 upgradeMode: stateless
3、访问flink ui:验证checkpoint 也可以落地到实际的路径
http://flink.k8s.io/flink/application-deployment-checkpoint/#/job/453b1bd5f90756e7afe929c3e71f747b/checkpoints
4、通过往Kafka 发送数据,flink 程序也可以写入到mysql
对于application 作业,要是终止和重启这个作业,那么整个application的集群也会一起关闭掉,jobmanager和taskmanager也会一并释放掉,checkpoint数据也会丢失,因为新的作业checkpoint路径是会变化的,这个时候需要查询pv获取最后一次的checkpoint文件路径。
1、终止作业
kubectl delete -f application-deployment-checkpoint.yaml
2、编写重启作业的yaml
前面的内容和第一次提交checkpoint是一样的,只是在第一次启动需要指定initialSavepointPath,只有这点不一样。
[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: namespace: flink name: application-deployment-checkpoint # flink 集群名称 spec: image: flink:1.13.6 # flink基础镜像 flinkVersion: v1_13 # flink版本,选择1.13 imagePullPolicy: IfNotPresent # 镜像拉取策略,本地没有则从仓库拉取 ingress: # ingress配置,用于访问flink web页面 template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)" className: "nginx" annotations: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: file:///opt/flink/checkpoints serviceAccount: flink jobManager: resource: memory: "1024m" cpu: 1 taskManager: resource: memory: "1024m" cpu: 1 podTemplate: spec: hostAliases: - ip: "172.16.252.129" hostnames: - "kafka-01" - ip: "172.16.252.130" hostnames: - "kafka-02" - ip: "172.16.252.131" hostnames: - "kafka-03" containers: - name: flink-main-container env: - name: TZ value: Asia/Shanghai volumeMounts: - name: flink-jar # 挂载nfs上的jar mountPath: /opt/flink/jar - name: flink-checkpoints # 挂载checkpoint pvc mountPath: /opt/flink/checkpoints volumes: - name: flink-jar persistentVolumeClaim: claimName: flink-jar-pvc - name: flink-checkpoints persistentVolumeClaim: claimName: flink-checkpoint-application-pvc job: jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包 entryClass: org.fblinux.StreamWordCountWithCP args: # 传递到作业main方法的参数 - "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092" - "flink_test" - "172.16.252.113" - "3306" - "flink_test" - "wc" - "file:///opt/flink/checkpoints" - "10000" - "1" initialSavepointPath: /opt/flink/checkpoints/4a81f02e391f129fa647121e4014226f/chk-70/ # checkpoint文件绝对路径 parallelism: 1 upgradeMode: stateless
重新提交,在通过Kafka写入数据,观察mysql表就可以看到是从checkpoint恢复的数据