前言
大家还记得双11得时候,会有一个大屏实时刷新显示交易金额,这究竟是怎么实时统计计算的呢?Apache Flink是一个开源、流行的大数据框架和分布式处理引擎,特别是针对流式数据的处理,那么今天通过一个简单的业务场景,实时统计用户的交易金额,感受一下flink的魅力。
Flink入门例子
业务场景
监听socket请求,获取用户的流水信息,实时输出用户的交易总金额。order
信息如下所示:
public class Order {
/**
* 订单号
*/
private String orderId;
/**
* 用户号
*/
private String userId;
/**
* 交易金额
*/
private Long amount;
/**
* 交易时间
*/
private String date;
}
Flink统计任务步骤
1.17.0
org.apache.flink
flink-streaming-java
${flink.version}
provided
org.apache.flink
flink-clients
${flink.version}
provided
org.apache.flink
flink-runtime-web
${flink.version}
provided
演示
打包运行
目前flink任务在idea中运行正确,我们需要将它达成jar包,丢到linux服务器上的flink集群中运行。
org.apache.maven.plugins
maven-shade-plugin
3.2.4
package
shade
com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*
org.apache.hadoop:*
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
往socket中发交易数据,可以实时看到统计结果
Flink究竟是什么?
通过上面的例子,我们能直观的感受到flink的能力,Flink是一个开源的流式处理框架,它提供了高效、可扩展和容错的数据流处理能力。它支持流式和批处理数据处理,并且具有低延迟、高吞吐量和精确一次处理语义的特性。
Flink的核心目标,是"数据流上的有状态计算"。把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态,这就是所谓的“有状态的流处理”。
总结
本文通过一个实时统计用户的交易金额的例子感受了一下Flink的作用,但是实际场景往往比这复杂的多,比如统计用户指定时间范围内(3天内)的交易金额,交易流水发生乱序了怎么办,Flink面对这些问题其实都有相应的机制处理,需要进一步去挖掘学习。
附录
完整代码如下
public class OrderStreamDemo {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度
env.setParallelism(1);
// 2.读取数据
DataStreamSource socketDS = env.socketTextStream("10.100.1.13", 7776);
// 3.转换成对象
SingleOutputStreamOperator wordAndOneDS = socketDS
.map(new MapFunction() {
@Override
public Order map(String value) {
// socket中数据根据逗号分割,映射出Order对象
String[] orders = value.split(",");
Order order = new Order();
if(orders.length >= 4) {
String orderId = orders[0];
order.setOrderId(orderId);
String userId = orders[1];
order.setUserId(userId);
Long amt = Long.valueOf(orders[2]);
order.setAmount(amt);
String time = orders[3];
order.setDate(time);
}
return order;
}
});
// 4 分组
KeyedStream wordAndOneKS = wordAndOneDS.keyBy(
new KeySelector() {
@Override
public String getKey(Order value) {
return value.getUserId();
}
}
);
// 5 聚合
SingleOutputStreamOperator sumDS = wordAndOneKS.sum("amount");
// 6. 转换输出结果
SingleOutputStreamOperator mapRes = sumDS.map(new MapFunction() {
@Override
public String map(Order value) throws Exception {
String str = String.format("时间: %s, 用户 %s 累计交易金额:%d元", value.getDate(), value.getUserId(), value.getAmount());
return str;
}
});
// 7.输出数据
mapRes.print();
// 8.执行
env.execute();
}
}
欢迎关注个人公众号【JAVA旭阳】交流学习!