(3)Flink on k8s checkpoint 状态写入pv

2023年 7月 16日 92.9k 0

1.简介

大家都知道,Flink 是一个有状态的分布式流式计算引擎,flink 中的每个function或者是operator都可以是有状态的,为了使得状态可以容错,flink引入了checkpoint机制。checkpoint使得flink能够恢复作业的状态和位置,从而为作业提供与无故障执行相同的语义。

CheckPoint的触发和状态数据管理主要由JobManager负责,JobManager需要将checkpoint生成的状态信息文件保存在外部Storage中,在基于On Yarn的部署模式下,这个Storage通常是HDFS,在On Kubernetes的模式下,这个Storage通常是PV,这样可以使Flink On K8s减少对其他第三方组件的依赖,比如Hadoopp的依赖。

2.Checkpoint 程序开发

为了演示在K8S 上运行flink checkpoint程序,新开发了一个checkpoint演示程序,这个程序实现的是WordCount单词统计功能。

在这个示例中,上游的Source使用Kafka,由Kafka的生产者程序往对应的topic发送字符流信息,在实际生成环境中,Flink的Source通常也是Kafka。

Flink Checkpoint程序读取Kafka Topic的字符流,然后进行分词和单词统计,并将统计结果Sink输出到MySQL数据库中。

为了确保即使程序终止重启后,可以在原有的单词统计结果上累计单词的统计结果,例如在Flink程序重启前,我们统计Hello的单词出现了5次,在重启程序继续消费Kafka的字符流数据时,当首次遇到包含Hello单词的字符流时,此时Hello的统计结果是6,而不是1。为此,我们在程序里开启checkpoint功能,这样JobManager就能定期周期性的触发checkpoint操作,将当前的统计结果,也就是checkpoint状态信息保存起来。

1、flink 主程序

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

import java.util.Arrays;
import java.util.Properties;

public class StreamWordCountWithCP {

    private static Logger logger = Logger.getLogger(StreamWordCountWithCP.class);
    public static void main(String[] args) throws Exception {
        logger.info("******************* StreamWordCountWithCP job start *******************");

        // kafka server
        String kafkaServer = "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092";
        // kafka topic
        String kafkaTopic = "flink_test";
        // mysql数据库ip
        String dbHost = "172.16.252.113";
        // mysql数据库端口
        String dbPort = "3306";
        // 数据库名称
        String dbName = "flink_test";
        // 结果表
        String table = "wc";
        // checkpoint文件保存路径
        String checkpointPath = "file:///Users/willwang/flink-on-k8s-demo/checkpoint";
        // checkpoint保存时间间隔,默认10s
        long interval = 10000;
        // 并行度
        int parallelism = 1;

        // 从程序传参中获取参数
        if (args != null && args.length == 9) {
            kafkaServer = args[0];
            kafkaTopic = args[1];
            dbHost = args[2];
            dbPort = args[3];
            dbName = args[4];
            table = args[5];
            checkpointPath = args[6];
            interval = Long.parseLong(args[7]);
            parallelism = Integer.parseInt(args[8]);

            logger.info("******************* kafkaServer=" + args[0] + ", " +
                    "kafkaTopic=" + args[1] + ", " +
                    "dbHost=" + args[2] + ", " +
                    "dbPort=" + args[3] + ", " +
                    "dbName=" + args[4] + ", " +
                    "table=" + args[5] + ", " +
                    "checkpointPath=" + args[6] + ", " +
                    "interval=" + args[7] + ", " +
                    "parallelism=" + args[8]);
        }

        // 0. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 1. 配置checkpoint
        env.enableCheckpointing(interval);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.enableUnalignedCheckpoints();
        checkpointConfig.setCheckpointStorage(checkpointPath);

        // 2. 配置Kafka
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaServer);
        properties.setProperty("group.id", "wc-consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 3. 设置kafka source
        DataStreamSource stream = env.addSource(new FlinkKafkaConsumer(
                kafkaTopic,
                new SimpleStringSchema(),
                properties
        ));

        // 4. 转换数据格式
        SingleOutputStreamOperator wordAndOne = stream
                .flatMap((String line, Collector words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 5. 分组
        KeyedStream wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);

        // 6. 求和
        SingleOutputStreamOperator result = wordAndOneKS
                .sum(1);

        // 7. 设置自定义sink,结果输出到MySQL
        result.addSink(new MySQLSink(dbHost, dbPort, dbName, table));

        env.execute("StreamWordCountWithCP");
    }
}

2、sink mysql工具类

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.log4j.Logger;

import java.sql.*;

public class MySQLSink extends RichSinkFunction {
    private static Logger logger = Logger.getLogger(MySQLSink.class);
    private Connection conn;
    private PreparedStatement insertPS;
    private PreparedStatement updatePS;
    private String dbHost;
    private String dbPort;
    private String dbName;
    private String table;

    public MySQLSink(String dbHost, String dbPort, String dbName, String table) {
        this.dbHost = dbHost;
        this.dbPort = dbPort;
        this.dbName = dbName;
        this.table = table;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        String url = "jdbc:mysql://"+dbHost+":"+dbPort+"/" + dbName + "?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC";
        //建立mysql连接
        conn = DriverManager.getConnection(url, "root", "111...aaa");
        String sql = "INSERT INTO "+table+" (`word`, `cnt`) VALUES(?,?);";
        insertPS = conn.prepareStatement(sql);

        String sql2 = "UPDATE "+table+" set cnt = ? where word = ?;";
        updatePS = conn.prepareStatement(sql2);

        System.out.println("自定义sink,open数据库链接 =====");
        logger.info("*** 自定义sink,open数据库链接 =====");
    }

    @Override
    public void close() throws Exception {
        if(insertPS != null){
            insertPS.close();
        }
        if(updatePS != null){
            updatePS.close();
        }
        if(conn != null){
            conn.close();
        }
        System.out.println("自定义sink,close数据库连接 =====");
        logger.info("*** 自定义sink,close数据库连接 =====");
    }

    @Override
    public void invoke(Tuple2 value, Context context) throws Exception {
        String sql = "select count(*) from "+table+" where word = '" + value.f0 + "'";
        System.out.println(sql);
        logger.info("*** " + sql);
        Statement statement = conn.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);
        long cnt = 0;
        while (resultSet.next()) {
            cnt = resultSet.getLong(1);
        }
        resultSet.close();
        statement.close();

        if (cnt > 0) {
            System.out.println("update value=" + value);
            updatePS.setLong(1,value.f1);
            updatePS.setString(2,value.f0);
            updatePS.executeUpdate();
        } else {
            System.out.println("insert value=" + value);
            insertPS.setString(1,value.f0);
            insertPS.setLong(2,value.f1);
            insertPS.executeUpdate();
        }
    }
}

3.K8S 测试配置

1、创建checkpoint pvc 存储checkpoint的数据

[root@k8s-demo001 ~]# cat flink-checkpoint-application-pvc.yaml 
# Flink checkpoint 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-checkpoint-application-pvc  # checkpoint pvc名称
  namespace: flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

2、通过application 模式进行测试

[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml 
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: application-deployment-checkpoint  # flink 集群名称
spec:
  image: flink:1.13.6  # flink基础镜像
  flinkVersion: v1_13  # flink版本,选择1.13
  imagePullPolicy: IfNotPresent  # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: file:///opt/flink/checkpoints
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
          volumeMounts:
            - name: flink-jar  # 挂载nfs上的jar
              mountPath: /opt/flink/jar
            - name: flink-checkpoints  # 挂载checkpoint pvc
              mountPath: /opt/flink/checkpoints
      volumes:
        - name: flink-jar
          persistentVolumeClaim:
            claimName: flink-jar-pvc
        - name: flink-checkpoints
          persistentVolumeClaim:
            claimName: flink-checkpoint-application-pvc
  job:
    jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
    entryClass: org.fblinux.StreamWordCountWithCP
    args:   # 传递到作业main方法的参数
      - "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"
      - "flink_test"
      - "172.16.252.113"
      - "3306"
      - "flink_test"
      - "wc"
      - "file:///opt/flink/checkpoints"
      - "10000"
      - "1"
    parallelism: 1
    upgradeMode: stateless

3、访问flink ui:验证checkpoint 也可以落地到实际的路径

http://flink.k8s.io/flink/application-deployment-checkpoint/#/job/453b1bd5f90756e7afe929c3e71f747b/checkpoints

image-20230526155337372

4、通过往Kafka 发送数据,flink 程序也可以写入到mysql

image-20230526155527536

对于application 作业,要是终止和重启这个作业,那么整个application的集群也会一起关闭掉,jobmanager和taskmanager也会一并释放掉,checkpoint数据也会丢失,因为新的作业checkpoint路径是会变化的,这个时候需要查询pv获取最后一次的checkpoint文件路径。

image-20230526160357424

1、终止作业

kubectl delete -f application-deployment-checkpoint.yaml 

2、编写重启作业的yaml

前面的内容和第一次提交checkpoint是一样的,只是在第一次启动需要指定initialSavepointPath,只有这点不一样。

[root@k8s-demo001 ~]# cat application-deployment-checkpoint.yaml 
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: application-deployment-checkpoint  # flink 集群名称
spec:
  image: flink:1.13.6  # flink基础镜像
  flinkVersion: v1_13  # flink版本,选择1.13
  imagePullPolicy: IfNotPresent  # 镜像拉取策略,本地没有则从仓库拉取
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: file:///opt/flink/checkpoints
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  podTemplate:
    spec:
      hostAliases:
        - ip: "172.16.252.129"
          hostnames:
            - "kafka-01"
        - ip: "172.16.252.130"
          hostnames:
            - "kafka-02"
        - ip: "172.16.252.131"
          hostnames:
            - "kafka-03"
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
          volumeMounts:
            - name: flink-jar  # 挂载nfs上的jar
              mountPath: /opt/flink/jar
            - name: flink-checkpoints  # 挂载checkpoint pvc
              mountPath: /opt/flink/checkpoints
      volumes:
        - name: flink-jar
          persistentVolumeClaim:
            claimName: flink-jar-pvc
        - name: flink-checkpoints
          persistentVolumeClaim:
            claimName: flink-checkpoint-application-pvc
  job:
    jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包
    entryClass: org.fblinux.StreamWordCountWithCP
    args:   # 传递到作业main方法的参数
      - "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"
      - "flink_test"
      - "172.16.252.113"
      - "3306"
      - "flink_test"
      - "wc"
      - "file:///opt/flink/checkpoints"
      - "10000"
      - "1"
    initialSavepointPath: /opt/flink/checkpoints/4a81f02e391f129fa647121e4014226f/chk-70/ # checkpoint文件绝对路径
    parallelism: 1
    upgradeMode: stateless

重新提交,在通过Kafka写入数据,观察mysql表就可以看到是从checkpoint恢复的数据

 

 

相关文章

对接alertmanager创建钉钉卡片(1)
手把手教你搭建OpenFalcon监控系统
无需任何魔法即可使用 Ansible 的神奇变量“hostvars”
openobseve HA本地单集群模式
基于k8s上loggie/vector/openobserve日志收集
openobseve单节点和查询语法

发布评论