Flink小白入门,如何实时统计用户的交易金额?

2023年 8月 18日 51.3k 0

前言

大家还记得双11得时候,会有一个大屏实时刷新显示交易金额,这究竟是怎么实时统计计算的呢?Apache Flink是一个开源、流行的大数据框架和分布式处理引擎,特别是针对流式数据的处理,那么今天通过一个简单的业务场景,实时统计用户的交易金额,感受一下flink的魅力。

Flink入门例子

业务场景

监听socket请求,获取用户的流水信息,实时输出用户的交易总金额。order信息如下所示:

public class Order {
    /**
     * 订单号
     */
    private String orderId;

    /**
     * 用户号
     */
    private String userId;

    /**
     * 交易金额
     */
    private Long amount;

    /**
     * 交易时间
     */
    private String date;
}

Flink统计任务步骤

  • 引入flink相关的依赖,本案例采用flink最新的1.17.0版本
  •  
            1.17.0
        
    
        
            
                org.apache.flink
                flink-streaming-java
                ${flink.version}
                provided
            
    
            
                org.apache.flink
                flink-clients
                ${flink.version}
                provided
            
    
            
                org.apache.flink
                flink-runtime-web
                ${flink.version}
                provided
            
        
    
  • 创建flink执行环境
  • 读取数据
  • 处理数据,map方法转换成Order对象
  • 通过keyBy方法分组
  • 根据金额字段sum聚合
  • 转换输出结果
  • 打印结果
  • 开始执行任务
  • 演示

  • 通过nc命令服务器上启动socket服务
  • 直接在idea中本地方式运行main方法,启动flink任务
  • socket服务中发送数据,可以查看idea console实时输出统计结果
  • 打包运行

    目前flink任务在idea中运行正确,我们需要将它达成jar包,丢到linux服务器上的flink集群中运行。

  • maven打包插件
  • 
        org.apache.maven.plugins
        maven-shade-plugin
        3.2.4
        
            
                package
                
                    shade
                
                
                    
                        
                            com.google.code.findbugs:jsr305
                            org.slf4j:*
                            log4j:*
                            org.apache.hadoop:*
                        
                    
                    
                        
                            
                            *:*
                            
                                META-INF/*.SF
                                META-INF/*.DSA
                                META-INF/*.RSA
                            
                        
                    
                    
                        
                        
                    
                
            
        
    
    
  • flink界面提交任务
  • 查看输出结果
  • 往socket中发交易数据,可以实时看到统计结果

    Flink究竟是什么?

    通过上面的例子,我们能直观的感受到flink的能力,Flink是一个开源的流式处理框架,它提供了高效、可扩展和容错的数据流处理能力。它支持流式和批处理数据处理,并且具有低延迟、高吞吐量和精确一次处理语义的特性。

    Flink的核心目标,是"数据流上的有状态计算"。把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态,这就是所谓的“有状态的流处理”。

    总结

    本文通过一个实时统计用户的交易金额的例子感受了一下Flink的作用,但是实际场景往往比这复杂的多,比如统计用户指定时间范围内(3天内)的交易金额,交易流水发生乱序了怎么办,Flink面对这些问题其实都有相应的机制处理,需要进一步去挖掘学习。

    附录

    完整代码如下

    public class OrderStreamDemo {
    
        public static void main(String[] args) throws Exception {
            // 1.创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
            // 设置并行度
            env.setParallelism(1);
    
            // 2.读取数据
            DataStreamSource socketDS = env.socketTextStream("10.100.1.13", 7776);
            
            // 3.转换成对象
            SingleOutputStreamOperator wordAndOneDS = socketDS
                    .map(new MapFunction() {
                        @Override
                        public Order map(String value) {
                            // socket中数据根据逗号分割,映射出Order对象
                            String[] orders = value.split(",");
                            Order order = new Order();
                            if(orders.length >= 4) {
                                String orderId = orders[0];
                                order.setOrderId(orderId);
                                String userId = orders[1];
                                order.setUserId(userId);
                                Long amt = Long.valueOf(orders[2]);
                                order.setAmount(amt);
                                String time = orders[3];
                                order.setDate(time);
                            }
                            return order;
                        }
                    });
    
            // 4 分组
            KeyedStream wordAndOneKS = wordAndOneDS.keyBy(
                    new KeySelector() {
                        @Override
                        public String getKey(Order value) {
                            return value.getUserId();
                        }
                    }
            );
    
            // 5 聚合
            SingleOutputStreamOperator sumDS = wordAndOneKS.sum("amount");
    
            // 6. 转换输出结果
            SingleOutputStreamOperator mapRes = sumDS.map(new MapFunction() {
                @Override
                public String map(Order value) throws Exception {
                    String str = String.format("时间: %s, 用户 %s 累计交易金额:%d元", value.getDate(), value.getUserId(), value.getAmount());
                    return str;
                }
            });
    
            // 7.输出数据
            mapRes.print();
    
            // 8.执行
            env.execute();
        }
    
    }
    

    欢迎关注个人公众号【JAVA旭阳】交流学习!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论