MapReduce实现TopN的效果

2023年 7月 17日 44.2k 0

1、背景

最近在学习Hadoop的MapReduce,此处记录一下如何实现 TopN 的效果,以及在MapReduce中如何实现 自定义分组

2、需求

我们有一份数据,数据中存在如下3个字段,订单编号,订单项订单项价格。 输出的数据,需求如下:

  • 订单编号与订单编号之间需要正序输出。
  • 输出每个订单价格最高的2个订单项
  • 3、分析

  • 订单编号与订单编号之间需要正序输出,那么订单编号必须要作为Key,因为只有Key才有排序操作。
  • 输出每个订单价格最高的2个订单项: 这个输出是在reduce阶段,并且是每个订单,因此需要根据订单编号进行分组操作(前后2个key比较,相同则为一组),而分组也只有Key才有,因此就需要JavaBean(订单编号、订单项、订单项价格)来作为组合Key
  • 订单编号与订单编号之间需要正序输出 && 输出每个订单价格最高的2个订单项: 可以看出在Key中的排序规则为:根据订单编号升序,然后根据订单项价格倒序排序, 并且是根据订单编号来分组。
  • 我们知道默认MapReduce中默认的分区规则是,根据key的hascode来进行分区,而 分区 下是有多个 分组,每个分组调用一次reduce方法。 而我们上方的思路是,根据订单编号来进行分组,当我们Key是JavaBean组合Key时,相同的订单编号所在的JavaBean会被分在一个分组吗,这个不一定,因为JavaBean的hashcode不一定一致,因此就需要我们自定义分区(继承Partitioner类)。此处我们job.setNumReduceTasks设置为1个,因此不考虑这个分区的问题
  • 一个分区下有多个分组,每个分组调用一次reduce方法。
  • 4、准备数据

    4.1 准备数据

    20230713000010  item-101    10
    20230713000010  item-102    30
    20230713000015  item-151    10
    20230713000015  item-152    20
    20230713000010  item-103    20
    20230713000015  item-153    30
    20230713000012  item-121    50
    20230713000012  item-122    10
    20230713000012  item-123    30
    

    4.2 每行数据格式

    订单编号          订单项      订单项价格
    20230713000012  item-123    30
    

    每行数据的分隔符为空格

    4.3 期望输出结果

    20230713000010  item-102    30
    20230713000010  item-103    20
    20230713000012  item-121    50
    20230713000012  item-123    30
    20230713000015  item-153    30
    20230713000015  item-152    20
    

    5、编码实现

    5.1 引入jar包

    
        
            org.apache.hadoop
            hadoop-client
            3.3.4
        
        
            org.projectlombok
            lombok
            1.18.22
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-jar-plugin
                3.2.2
                
                    
                        
                            true
                            lib/
                            com.huan.hadoop.mr.TopNDriver
                        
                    
                
            
        
    
    

    5.2 编写实体类

    package com.huan.hadoop.mr;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * 订单数据
     *
     * @author huan.fu
     * @date 2023/7/13 - 14:20
     */
    @Getter
    @Setter
    public class OrderVo implements WritableComparable {
        /**
         * 订单编号
         */
        private long orderId;
        /**
         * 订单项
         */
        private String itemId;
        /**
         * 订单项价格
         */
        private long price;
    
        @Override
        public int compareTo(OrderVo o) {
            // 排序: 根据 订单编号 升序, 如果订单编号相同,则根据 订单项价格 倒序
            int result = Long.compare(this.orderId, o.orderId);
            if (result == 0) {
                // 等于0说明 订单编号 相同,则需要根据 订单项价格 倒序
                result = -Long.compare(this.price, o.price);
            }
            return result;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            // 序列化
            out.writeLong(orderId);
            out.writeUTF(itemId);
            out.writeLong(price);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            // 反序列化
            this.orderId = in.readLong();
            this.itemId = in.readUTF();
            this.price = in.readLong();
        }
    
        @Override
        public String toString() {
            return this.getOrderId() + "t" + this.getItemId() + "t" + this.getPrice();
        }
    }
    
    
  • 此处需要实现 WritableComparable接口
  • 需要编写 排序序列化方法
  • 5.3 编写分组方法

    package com.huan.hadoop.mr;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 分组: 订单编号相同说明是同一组,否则是不同的组
     *
     * @author huan.fu
     * @date 2023/7/13 - 14:30
     */
    public class TopNGroupingComparator extends WritableComparator {
    
        public TopNGroupingComparator() {
            // 第二个参数为true: 表示可以通过反射创建实例
            super(OrderVo.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // 订单编号 相同说明是同一个对象,否则是不同的对象
            return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;
        }
    }
    
    
  • 实现 WritableComparator接口,自定义分组规则。
  • 分组是发生在reduce阶段,前后2个key比较,相同则为一组,一组调用一次reduce方法。
  • 5.4 编写 map 方法

    package com.huan.hadoop.mr;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * map 操作: 输出的key为OrderVo, 输出的value为: price
     *
     * @author huan.fu
     * @date 2023/7/13 - 14:28
     */
    public class TopNMapper extends Mapper {
    
        private final OrderVo outKey = new OrderVo();
        private final LongWritable outValue = new LongWritable();
    
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            // 获取一行数据 20230713000010  item-101    10
            String row = value.toString();
            // 根据 t 进行分割
            String[] cells = row.split("\s+");
            // 获取订单编号
            long orderId = Long.parseLong(cells[0]);
            // 获取订单项
            String itemId = cells[1];
            // 获取订单项价格
            long price = Long.parseLong(cells[2]);
    
            // 设置值
            outKey.setOrderId(orderId);
            outKey.setItemId(itemId);
            outKey.setPrice(price);
            outValue.set(price);
    
            // 写出
            context.write(outKey, outValue);
        }
    }
    
    
  • map 操作: 输出的key为OrderVo, 输出的value为: price
  • 5.5 编写reduce方法

    package com.huan.hadoop.mr;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现,
     * 即 订单编号 相同的认为一组
     *
     * @author huan.fu
     * @date 2023/7/13 - 14:29
     */
    public class TopNReducer extends Reducer {
    
        @Override
        protected void reduce(OrderVo key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
            int topN = 0;
            // 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的
            for (LongWritable price : values) {
                topN++;
                if (topN > 2) {
                    break;
                }
                // 注意: 此处的key每次输出都不一样
                context.write(key, NullWritable.get());
            }
        }
    }
    
    
  • reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现,即 订单编号 相同的认为一组.
  • 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的
  • 5.6 编写driver类

    package com.huan.hadoop.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author huan.fu
     * @date 2023/7/13 - 14:29
     */
    public class TopNDriver extends Configured implements Tool {
    
        public static void main(String[] args) throws Exception {
            // 构建配置对象
            Configuration configuration = new Configuration();
            // 使用 ToolRunner 提交程序
            int status = ToolRunner.run(configuration, new TopNDriver(), args);
            // 退出程序
            System.exit(status);
        }
    
        @Override
        public int run(String[] args) throws Exception {
            // 构建Job对象实例 参数(配置对象,Job对象名称)
            Job job = Job.getInstance(getConf(), "topN");
            // 设置mr程序运行的主类
            job.setJarByClass(TopNDriver.class);
            // 设置mr程序运行的 mapper类型和reduce类型
            job.setMapperClass(TopNMapper.class);
            job.setReducerClass(TopNReducer.class);
            // 指定mapper阶段输出的kv数据类型
            job.setMapOutputKeyClass(OrderVo.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 指定reduce阶段输出的kv数据类型,业务mr程序输出的最终类型
            job.setOutputKeyClass(OrderVo.class);
            job.setOutputValueClass(NullWritable.class);
            // 配置本例子中的输入数据路径和输出数据路径,默认输入输出组件为: TextInputFormat和TextOutputFormat
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 先删除输出目录(方便本地测试)
            FileSystem.get(this.getConf()).delete(new Path(args[1]), true);
    
            // 设置分组
            job.setGroupingComparatorClass(TopNGroupingComparator.class);
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    }
    
  • 需要设置分组 job.setGroupingComparatorClass(TopNGroupingComparator.class);
  • 5.7 运行结果

    运行结果

    6、完整代码

    gitee.com/huan1993/sp…

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论