Flink之源算子Data Source

2023年 10月 15日 74.7k 0

概述

Flink中的DataSource(数据源)用于定义数据输入的来源。数据源是Flink作业的起点,它可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。

将数据源添加到Flink执行环境中,从而创建一个数据流。然后可以对该数据流应用一系列转换和操作,例如过滤、转换、聚合、计算等。最后将结果写入其他系统,例如文件系统、数据库、消息队列等。

数据源是Flink作业中非常重要的组件,它确定了数据的来源和初始输入,是构建流处理和批处理作业的基础。

内置Data Source

Flink Data Source用于定义Flink程序的数据来源,Flink官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流

基于集合构建

可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,比如采用集合类型。一般用来进行本地调试或者验证。

fromCollection(Collection):基于集合构建,集合中的所有元素必须是同一类型

fromElements(T ...): 基于元素构建,所有元素必须是同一类型

generateSequence(from, to):基于给定的序列区间进行构建

fromCollection(Iterator, Class):基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //  基于元素构建
        DataStreamSource source1 = env.fromElements(1, 2, 3, 4);
        // 基于集合构建
        DataStreamSource source2 = env.fromCollection(Arrays.asList(1, 2, 3, 4));
		// 基于给定的序列区间进行构建
		env.generateSequence(0,100);
		// 基于迭代器进行构建
		env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();

        source1.print();
        
        env.execute();
    }

自定义的迭代器CustomIterator,产生 1 到 100 区间内的数据

注意: 自定义迭代器要实现Iterator接口外,还必须要实现序列化接口Serializable ,否则会抛出序列化失败的异常

import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator, Serializable {
    private Integer i = 0;

    @Override
    public boolean hasNext() {
        return i < 100;
    }

    @Override
    public Integer next() {
        i++;
        return i;
    }
}

基于文件构建

在本地环境进行测试时可以方便地从本地文件读取数据

readTextFile(path):按照TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下:

readFile(fileInputFormat, path) :按照指定格式读取文件。

readFile(inputFormat, filePath, watchType, interval, typeInformation):按照指定格式周期性的读取文件。

各个参数含义:

inputFormat:数据流的输入格式

filePath:文件路径,可以是本地文件系统上的路径,也可以是HDFS上的文件路径

watchType:读取方式,两个可选值: 
	1.FileProcessingMode.PROCESS_ONCE: 表示对指定路径上的数据只读取一次,然后退出
	2.FileProcessingMode.PROCESS_CONTINUOUSLY: 表示对路径进行定期地扫描和读取。注意:当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破Flink的exactly-once语义


interval:定期扫描的时间间隔

typeInformation:输入流中元素的类型
    public static void main(String[] args) throws Exception {
        String filePath = "data/test.text";
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
                // 读取文本文件,并将其内容以字符串的形式返回
                .readTextFile(filePath)
                .print();

        env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_ONCE, 1, BasicTypeInfo.STRING_TYPE_INFO).print();

        env.execute();
    }

基于Socket构建

通过监听Socket端口,可以在本地很方便地模拟一个实时计算环境。

Flink提供了socketTextStream方法可以通过host和port从一个Socket中以文本的方式读取数据,以此构建基于Socket的数据流

socketTextStream方法有以下四个主要参数:

hostname:主机名

port:端口号,设置为 0 时,表示端口号自动分配

delimiter:用于分隔每条记录的分隔符

maxRetry:当Socket临时关闭时,程序的最大重试间隔,单位为秒。设置为0时表示不进行重试;设置为负值则表示一直重试

示例如下:

env.socketTextStream("IP", 8888, "\n", 3).print();

读取socket文本流,是流处理场景,这种方式由于吞吐量小、稳定性较差,一般也是用于测试

 //  创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

// 读取socket文本流
DataStreamSource socketDS = env.socketTextStream("IP", 8888);

注意:基于Socket构建数据源,一般需要搭配Netcat使用。

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。

安装nc命令

yum install -y nc

启动socket端口

[root@master ~]# nc -l 8080
abc bcd cde
bcd cde fgh
cde fgh hij

注意:

测试时,先启动端口,后启动程序,会报超时连接异常,最后发送测试数据即可。

自定义Data Source

可以通过实现Flink的SourceFunction、ParallelSourceFunction、RichSourceFunction、RichParallelSourceFunction等类并重写其方法以此实现自定义Data Source

ParallelSourceFunction、RichParallelSourceFunction分别与SourceFunction、RichSourceFunctio功能类似,只不过它们通过SourceContext发送的数据会自动分发到并行任务中去,也就是说具有并行度的功能。

SourceFunction

它是Flink 提供的基础接口之一,用于定义数据源的行为。它包含一个 run 方法,该方法用于启动数据源,并使用SourceContext来发送数据元素。它中的方法是生命周期很简单的基础方法。

操作步骤:

实现SourceFunction接口:创建一个实现SourceFunction接口的类,该接口定义读取数据并发出数据流的方法。这个接口中的核心方法是run()和cancel(),其中run()方法用于读取数据并发出一系列事件,cancel()方法用于取消数据源的运行

实现run()方法:可以定义从数据源读取数据的逻辑。这可以是从文件、数据库、消息队列等读取数据的逻辑。在适当的时候,使用collect()方法将读取的数据发出到数据流中

实现cancel()方法:可以编写停止或清理数据源的逻辑。例如,如果数据源使用了外部资源,在这里释放这些资源

注册数据源:将数据源注册到Flink的执行环境中,以便可以在作业中使用。通过执行环境的addSource()方法,向执行环境添加数据源
public class MySource implements SourceFunction {

    private boolean isRunning = true;

    /**
     * run() 方法是核心方法,它会不断地读取、产生数据并将数据发送到下游
     * 
     */
    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            // 产生一些数据
            String data = UUID.randomUUID().toString();
            // 将数据发送到下游
            ctx.collect(data);
            // 每秒产生一条数据
            Thread.sleep(1000);
        }
    }

    /**
     * cancel() 方法用于在取消任务时清理资源
     */
    @Override
    public void cancel() {
        isRunning = false;
    }
}

将自定义的数据源传递给 env.addSource() 方法,并通过 .print() 将数据打印到控制台中。最后调用 env.execute() 方法来启动Flink程序。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// 将自定义的数据源添加到 Flink 程序中
        DataStreamSource streamSource = env.addSource(new MySource());

        streamSource.print();

        env.execute("MyApp");
    }

RichSourceFunction

如果需要更高级的功能和更丰富的生命周期控制,可以使用RichSourceFunction 类。RichSourceFunction是
SourceFunction接口的子类,它提供了额外的方法和功能,例如初始化、配置和资源管理。

操作步骤:

扩展RichSourceFunction 类:创建一个类,扩展 RichSourceFunction 类,并将 T 替换为要发出的数据类型

实现open() 方法:进行初始化操作,例如建立与外部系统的连接或加载资源等。这个方法是在数据源的生命周期开始时被调用的

实现run() 方法:实现读取数据并发出数据流的逻辑。这个方法在启动数据源时会被调用

实现cancel() 方法:添加取消数据源的逻辑。这个方法将在停止数据源时调用

实现close() 方法:进行一些资源清理操作。这个方法是在数据源生命周期结束时调用的
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class CustomRichDataSource extends RichSourceFunction {

    private volatile boolean isRunning = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化操作,例如建立与外部系统的连接或加载资源等
    }

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            // 读取数据的逻辑

            // 发出数据到数据流
            ctx.collect("Hello, World!");

            // 控制发送数据的速度
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void close() throws Exception {
        // 资源清理操作
    }
}

常见连接器

第三方系统连接器

Flink内置了多种连接器,用于满足大多数的数据收集场景。连接器可以和多种多样的第三方系统进行交互。

Flink官方目前支持以下第三方系统连接器

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)

除Flink官方之外,还有一些其他第三方系统与Flink的连接器,通过Apache Bahir发布:

Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)

File Source连接器

从文件读取数据是一种常见方式,比如读取日志文件,这是批处理中最常见的读取方式

flink-connector-files是Apache Flink的一个连接器,用于将本地文件系统或远程文件系统中的文件作为数据源或数据接收器使用。

它提供了一种简单的方法来处理文本文件或其他格式的文件,例如CSV、JSON、Avro等,并将其转换为Flink数据流。在使用时,可以指定文件的路径、编码方式和分隔符等参数,并使用适当的转换函数将文件内容解析为Flink的数据类型,然后进行数据处理和分析。

它支持对输出流的写入操作,将Flink数据流中的结果写入到指定的文件中。可以通过配置文件路径、编码方式和文件格式等参数来控制输出文件的格式和内容

添加文件连接器依赖

        
            org.apache.flink
            flink-connector-files
            1.17.0
            provided
        
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * 从文件流中逐条记录读取
         *
         * 文件路径参数可以是目录,具体文件、以及从HDFS目录下读取
         * 路径可以是相对路径,也可以是绝对路径;
         * 相对路径是从系统属性`user.dir`获取路径:idea下是`project的根目录`,standalone模式下是`集群节点根目录`
         */
        FileSource fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/word.txt")).build();
        /**
         * source ——用户定义的来源
         * sourceName – 数据源的名称
         */
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source").print();

        env.execute();
    }

DataGen Source连接器

Flink提供了一个内置的DataGen连接器,主要用于生成一些随机数,进行流任务的测试以及性能测试


    org.apache.flink
    flink-connector-datagen
    1.17.0

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 如果有n个并行度, 最大值设为a
        // 将数值 均分成 n份,  a/n ,比如,最大100,并行度2,每个并行度生成50个
        // 其中一个是 0-49,另一个50-99
        /**
         * DataGeneratorSource中,单个并行度生成数据个数与 与 生成的数据个数 相关
         * 公式: 生成的数据个数 / 并行度 = 每个并行度生成个数
         * 例子: 并行度设置为2,生成数据个数100,则每个并行度生成个数=100/2. 一个并行度:0-49 另一个并行度:50-99
         */
        env.setParallelism(2);

        /**
         * 数据生成器Source
         * GeneratorFunction generatorFunction : GeneratorFunction接口函数需要实现, 重写map方法, 输入类型固定是Long
         * long count : 生成的数据个数。自动生成的数字序列,从0自增。当数字数序列最大值达到或小于这个值就停止
         * RateLimiterStrategy rateLimiterStrategy :限速策略,如每秒生成几条数据
         * TypeInformation typeInfo : 返回的数据类型
         *
         */
        DataGeneratorSource dataGeneratorSource = new DataGeneratorSource(
                new GeneratorFunction() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                100,
                RateLimiterStrategy.perSecond(1),
                Types.STRING
        );

        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator").print();

        env.execute();
    }
1> Number:0
2> Number:50
2> Number:51
1> Number:1
2> Number:52
1> Number:2
2> Number:53

Kafka Source连接器

Flink-connector-kafka就是Flink的一个连接器,它提供了一个简单的方法来将Kafka作为Flink应用程序的数据源或数据接收器使用。

具体来说,flink-connector-kafka可以帮助Flink应用程序从Kafka主题中读取数据,也可以将Flink的数据流写入到Kafka主题中。在使用时,可以指定Kafka集群的地址、主题名称、消费者组名称等参数,并使用适当的序列化和反序列化工具将数据转换为Flink的数据类型。

引入Kafka连接器依赖

        
            org.apache.flink
            flink-connector-kafka
            1.17.0
        
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        KafkaSource kafkaSource = KafkaSource.builder()
                // 指定kafka节点的地址和端口
                .setBootstrapServers("node01:9092,node02:9092,node03:9092")
                // 指定消费者组的id
                .setGroupId("flink_group")
                // 指定消费的 Topic
                .setTopics("flink_topic")
                // 指定反序列化器,反序列化value
                .setValueOnlyDeserializer(new SimpleStringSchema())
                // flink消费kafka的策略
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

		// 不使用 watermark 的策略,意味着数据流不会根据事件时间进行处理
        DataStreamSource stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
        stream.print("Kafka");

		// 定义事件时间watermark策略,处理数据流中的无序事件,并设置最大延迟时间为3秒。
        DataStreamSink kafka_source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source").print("Kafka");

        env.execute();
    }

起始消费位点

Kafka source 能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费 。

如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器OffsetsInitializer

如果未指定位点初始化器,将默认使用 OffsetsInitializer.earliest()

内置的位点初始化器包括:

KafkaSource.builder()
    // 从消费组提交的位点开始消费,不指定位点重置策略
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
    // 从最早位点开始消费
    .setStartingOffsets(OffsetsInitializer.earliest())
    // 从最末尾位点开始消费
    .setStartingOffsets(OffsetsInitializer.latest());

RabbitMQ Source连接器

添加对RabbitMQ连接器的依赖


    org.apache.flink
    flink-connector-rabbitmq
    3.0.1-1.17

1.服务质量 (QoS)

服务质量是一种用于控制数据源连接器如何消费消息的策略。在Flink中,服务质量定义了消费者和消息代理之间的消息传输保证级别。通过合适的服务质量设置,可以实现以下不同的语义保证:

Exactly-once:确保消息仅被正确处理一次

At-least-once:确保消息至少被正确处理一次

None(最多一次):不提供消息处理保证,可能会出现重复处理或丢失消息的情况

1.精确一次:

保证精确一次需要以下条件

开启checkpointing: 开启之后,消息在checkpoints完成之后才会被确认,然后从RabbitMQ队列中删除

使用关联标识Correlationids: 关联标识是RabbitMQ的一个特性,消息写入RabbitMQ时在消息属性中设置。从checkpoint恢复时有些消息可能会被重复处理,source可以利用关联标识对消息进行去重。

非并发source: 为了保证精确一次的数据投递,source必须是非并发的(并行度设置为1)。这主要是由于RabbitMQ分发数据时是从单队列向多个消费者投递消息的。

2.至少一次:

在checkpointing开启的条件下,如果没有使用关联标识或者source是并发的,那么source就只能提供至少一次的保证。

3.无任何保证:

如果没有开启checkpointing,source就不能提供任何的数据投递保证。使用这种设置时,source一旦接收到并处理消息,消息就会被自动确认。

2.消费者预取Consumer Prefetch

注意:

默认情况下是不设置prefetch count的,这意味着RabbitMQ服务器将会无限制地向source发送消息。因此在生产环境中,最好要设置它。

prefetch count是对单个channel设置的,并且由于每个并发的source都持有一个connection/channel,因此这个值实际上会乘以 source 的并行度,来表示同一时间可以向这个job总共发送多少条未确认的消息。

使用setPrefetchCount()方法用于设置消费者预取值,这里将其设置为 10。这意味着每个消费者在处理完 10 条消息之前不会从 RabbitMQ 队列中获取更多的消息。

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setPrefetchCount(10) //设置消费者预取值为 10
    ...
    .build();

以下是保证exactly-once的RabbitMQ source示例

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 启用检查点(checkpointing)以实现精确一次或至少一次的一致性保证
        env.enableCheckpointing(5000); // 每 5000 毫秒执行一次检查点

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("localhost")     // RabbitMQ 主机名
                .setPort(5672)            // RabbitMQ 端口号
                .setUserName("guest")     // RabbitMQ 用户名
                .setPassword("guest")     // RabbitMQ 密码
                .setVirtualHost("/")      // RabbitMQ 虚拟主机
                .setPrefetchCount(10) // 设置消费者预取值为 10
                .build();

        final DataStream stream = env
                .addSource(new RMQSource(
                        connectionConfig,          // RabbitMQ 连接配置
                        "queueName",               // 需要消费的 RabbitMQ 队列名
                        true,                      // 是否使用关联 ID;如果仅需要至少一次的保证,可以设置为 false
                        new SimpleStringSchema())) // 反序列化方案,将消息转换为 Java 对象
                .setParallelism(1);            // 非并行的源,仅在需要精确一次性保证时才需要设置

        stream.print();

        env.execute("RabbitMQ Source Example");
    }

MongoDB Source连接器

Flink 提供了MongoDB 连接器使用至少一次(At-least-once)的语义在 MongoDB collection中读取和写入数据。

要使用此连接器,先添加依赖到项目中:


    org.apache.flink
    flink-connector-mongodb
    1.0.1-1.17

 public static void main(String[] args) throws Exception {
        MongoSource source = MongoSource.builder()
                // MongoDB 连接 URI
                .setUri("mongodb://user:password@127.0.0.1:27017")
                // 数据库名
                .setDatabase("my_db")
                // 集合名
                .setCollection("my_coll")
                // 投影的字段
                .setProjectedFields("_id", "f0", "f1")
                // 默认值: 2048 设置每次循环读取时应该从游标中获取的行数
                .setFetchSize(2048)
                // 默认值:-1 限制每个reader最多读取文档的数量。如果设置了读取并行度大于1,那么最多读取的文档数量等于 并行度 * 限制数量。
                .setLimit(10000)
                // 默认值: true 不使用游标超时 防止cursor因为读取时间过长或者背压导致的空闲而关闭
                .setNoCursorTimeout(true)
                /**
                 * 使用分区可以利用并行读取来加速整体的读取效率。
                 *
                 * 设置分区策略,可选的分区策略有 SINGLE,SAMPLE,SPLIT_VECTOR,SHARDED 和 DEFAULT
                 *
                 * SINGLE:将整个集合作为一个分区。
                 * SAMPLE:通过随机采样的方式来生成分区,快速但可能不均匀。
                 * SPLIT_VECTOR:通过 MongoDB 计算分片的 splitVector 命令来生成分区,快速且均匀。 仅适用于未分片集合,需要 splitVector 权限。
                 * SHARDED:从 config.chunks 集合中直接读取分片集合的分片边界作为分区,不需要额外计算,快速且均匀。 仅适用于已经分片的集合,需要 config 数据库的读取权限。
                 * DEFAULT:对分片集合使用 SHARDED 策略,对未分片集合使用 SPLIT_VECTOR 策略。
                 */
                .setPartitionStrategy(PartitionStrategy.SAMPLE)      // 设置每个分区的内存大小,默认值:64mb 通过指定的分区大小,将 MongoDB 的一个集合切分成多个分区。 可以设置并行度,并行地读取这些分区,以提升整体的读取速度。
                .setPartitionSize(MemorySize.ofMebiBytes(64))
                // 默认值:10 仅用于 SAMPLE 抽样分区策略,设置每个分区的样本数量。抽样分区器根据分区键对集合进行随机采样的方式计算分区边界。 总的样本数量 = 每个分区的样本数量 * (文档总数 / 每个分区的文档数量)
                .setSamplesPerPartition(10)
                // 设置 MongoDeserializationSchema 用于解析 MongoDB BSON 类型的文档
                .setDeserializationSchema(new MongoDeserializationSchema() {
                    @Override
                    public String deserialize(BsonDocument document) {
                        return document.toJson();
                    }

                    @Override
                    public TypeInformation getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                })                                                  // 自定义的反序列化方案
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
                .setParallelism(2)                                  // 设置并行度为 2
                .print()
                .setParallelism(1);                                 // 设置并行度为 1

        env.execute("MongoDB Source Example");
    }    

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论