Flink 输出算子

2023年 7月 14日 79.2k 0

摘要:本文主要介绍flink常用的输出算子,和自定义输出算子;可以输出到其他系统、DB或者文件。

简介

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供
支持。

连接到外部系统

主要使用flink提供的sink算子,直接输出到外部系统,官方提供的nightlies.apache.org/flink/flink…

image.png

我们最常用的就是输出到消息队列或者数据库,按照官方例子输出即可。

输出到文件系统

pom.xml

新增连接器


    org.apache.flink
    flink-connector-files
    ${flink.version}

SinkFile

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource dataGeneratorSource = new DataGeneratorSource(
                new GeneratorFunction() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // 输出到文件系统
        FileSink fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("flink-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

image.png

输出到kafka

pom.xml

新增连接器


    org.apache.flink
    flink-connector-kafka
    ${flink.version}

SinkKafkaWithKey

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator sensorDS = env
                .socketTextStream("192.168.13.110", 7777);


        /**
         * 如果要指定写入kafka的key
         * 可以自定义序列器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         *
         */
        KafkaSink kafkaSink = KafkaSink.builder()
                .setBootstrapServers("192.168.13.110:9092")
                .setRecordSerializer(

                        new KafkaRecordSerializationSchema() {

                            @Nullable
                            @Override
                            public ProducerRecord serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

输出到mysql

pom.xml


    mysql
    mysql-connector-java
    8.0.27




    org.apache.flink
    flink-connector-jdbc
    1.17

SinkToMysql

public class SinkToMysql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator sensorDS = env
                .socketTextStream("192.168.137.100", 7777)
                .map(new MapFunction() {
                    @Override
                    public Student map(String value) throws Exception {
                        String[] v = value.split(",");
                        return new Student(v[0],v[1],Integer.parseInt(v[2]),Integer.parseInt(v[3]));
                    }
                });


        /**
         * 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Student waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getAge());
                        preparedStatement.setInt(3, waterSensor.getMoney());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.137.100:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("root")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );


        sensorDS.addSink(jdbcSink);


        env.execute();
    }

}

自定义输出

在复杂的业务系统中使用比较好

SinkToCustom

public class SinkToCustom {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator sensorDS = env
                .socketTextStream("192.168.137.100", 7777);


        sensorDS.addSink(new MySink());


        env.execute();
    }

    public static class MySink extends RichSinkFunction {

        Connection conn = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("初始化一次");
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("清理销毁操作");
        }

        /**
         * sink的核心逻辑,写出的逻辑就写在这个方法里
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println("每一条的业务逻辑,输出"+value);
        }
    }
}

image.png

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论