摘要:本文主要介绍flink
常用的输出算子,和自定义输出算子;可以输出到其他系统、DB或者文件。
简介
Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供
支持。
连接到外部系统
主要使用
flink
提供的sink
算子,直接输出到外部系统,官方提供的nightlies.apache.org/flink/flink…
我们最常用的就是输出到消息队列或者数据库,按照官方例子输出即可。
输出到文件系统
pom.xml
新增连接器
org.apache.flink
flink-connector-files
${flink.version}
SinkFile
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每个目录中,都有 并行度个数的 文件在写入
env.setParallelism(2);
// 必须开启checkpoint,否则一直都是 .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataGeneratorSource dataGeneratorSource = new DataGeneratorSource(
new GeneratorFunction() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1000),
Types.STRING
);
DataStreamSource dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
// 输出到文件系统
FileSink fieSink = FileSink
// 输出行式存储的文件,指定路径、指定编码
.forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder("UTF-8"))
// 输出文件的一些配置: 文件名的前缀、后缀
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("flink-")
.withPartSuffix(".log")
.build()
)
// 按照目录分桶:如下,就是每个小时一个目录
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略: 1分钟 或 1m
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withMaxPartSize(new MemorySize(1024*1024))
.build()
)
.build();
dataGen.sinkTo(fieSink);
env.execute();
}
}
输出到kafka
pom.xml
新增连接器
org.apache.flink
flink-connector-kafka
${flink.version}
SinkKafkaWithKey
public class SinkKafkaWithKey {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
SingleOutputStreamOperator sensorDS = env
.socketTextStream("192.168.13.110", 7777);
/**
* 如果要指定写入kafka的key
* 可以自定义序列器:
* 1、实现 一个接口,重写 序列化 方法
* 2、指定key,转成 字节数组
* 3、指定value,转成 字节数组
* 4、返回一个 ProducerRecord对象,把key、value放进去
*
*/
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers("192.168.13.110:9092")
.setRecordSerializer(
new KafkaRecordSerializationSchema() {
@Nullable
@Override
public ProducerRecord serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord("ws", key, value);
}
}
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("atguigu-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
}
输出到mysql
pom.xml
mysql
mysql-connector-java
8.0.27
org.apache.flink
flink-connector-jdbc
1.17
SinkToMysql
public class SinkToMysql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator sensorDS = env
.socketTextStream("192.168.137.100", 7777)
.map(new MapFunction() {
@Override
public Student map(String value) throws Exception {
String[] v = value.split(",");
return new Student(v[0],v[1],Integer.parseInt(v[2]),Integer.parseInt(v[3]));
}
});
/**
* 写入mysql
* 1、只能用老的sink写法: addsink
* 2、JDBCSink的4个参数:
* 第一个参数: 执行的sql,一般就是 insert into
* 第二个参数: 预编译sql, 对占位符填充值
* 第三个参数: 执行选项 ---》 攒批、重试
* 第四个参数: 连接选项 ---》 url、用户名、密码
*/
SinkFunction jdbcSink = JdbcSink.sink(
"insert into ws values(?,?,?)",
new JdbcStatementBuilder() {
@Override
public void accept(PreparedStatement preparedStatement, Student waterSensor) throws SQLException {
//每收到一条WaterSensor,如何去填充占位符
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getAge());
preparedStatement.setInt(3, waterSensor.getMoney());
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) // 批次的大小:条数
.withBatchIntervalMs(3000) // 批次的时间
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.137.100:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("root")
.withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
.build()
);
sensorDS.addSink(jdbcSink);
env.execute();
}
}
自定义输出
在复杂的业务系统中使用比较好
SinkToCustom
public class SinkToCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator sensorDS = env
.socketTextStream("192.168.137.100", 7777);
sensorDS.addSink(new MySink());
env.execute();
}
public static class MySink extends RichSinkFunction {
Connection conn = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("初始化一次");
}
@Override
public void close() throws Exception {
super.close();
System.out.println("清理销毁操作");
}
/**
* sink的核心逻辑,写出的逻辑就写在这个方法里
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("每一条的业务逻辑,输出"+value);
}
}
}