上一篇文章讲解了HDFS的组成和原理,HDFS是Hadoop框架中负责分布式存储的重要组件,Map/Reduce是Hadoop框架中负责分布式计算的组件,基于它写出来的计算程序,可以运行在成千上万台机器上。本篇主要讲解一下Map/Reduce的编程模型和实现原理。
编程模型
传统的编程模型
在传统的计算编程模型,都是“输入 -> 计算 -> 输出”模型,在这种编程模型中,都是需要程序首先读取到数据,或者用户给这个程序传入一些数据,然后在进行复杂的计算逻辑处理,最后处理的结果在输出到某个地方。
在单机时代,这种编程模型被广泛应用在数据计算领域,也诞生了很多数据处理框架,比如Kettle实际上就是遵循了模型。在处理数据量较小的时候问题不大,但数据量大了就有点问题了。
假设输入的数据有PB级别,虽说类似于Kettle这种数据集成框架,都是基于生产者消费者模型来实现的,不会出现PB级别的数据一下子全部都读取到内存的情况,但这种模式会由于下游的处理效率低下(磁盘IO,网络IO,CPU,内存等),影响了上游的读取效率,进而导致整体的计算处理能力低下。
另外,Kettle是通过线程调度的,线程都是基于抢占时间片的方式调度的,所以有可能会导致某些线程很长时间获取不到执行权。
基本传统的计算模型,都是通过线程调度的,线程调度暂无法按需分配CPU,内存,IO等资源,占用的都是主进程的资源,那么某些数据量很大的任务运行会导致数据量少的任务运行受限。
多线程编程模型
上述传统编程模型,虽说有线程调度,但是这个线程调度并不能使其中一个计算逻辑并发处理,所以处理效率会低下,那如果每个计算逻辑,都能够并发处理也是可以有效提高运算效率的。
与传统的编程模型相比,整体的架构是不变的,只不过中间的计算逻辑是并发处理的。输入算子读取数据,根据拆分(split)规则,将数据写到不同的队列里,计算逻辑会根据上游拆分出来的队列个数创建相应个数的线程,来并发处理,最后再将计算结果并发写入目标表里。
与传统的编程模型相比,虽说提高了计算的并发能力,但同时也带来了新的问题:
基于多线程编程模型的框架有Datax。Datax为什么会比Kettle数据集成的速率快,实际上Datax就是多线程处理模型,将数据split到不同的channel,然后并发写入到目标库里。同时Datax还提供了failover机制。
数据驱动型编程模型
数据驱动是一种思想,数据驱动型编程是一种编程范式。数据驱动定义了data和acton之间的关系,传统的思维方式是从action开始,一个action到新的action,不同的action里面可能会触发data的修改。数据驱动则是反其道而行之,以data的变化为起点,data的变化触发新的action,action改变data之后再触发另一个action。
现在很多框架都是基于数据驱动编程来实现的,比如CDC,ELK,Kafka,Pulsar等,MySQL CDC解析Binlog日志,将数据实时同步到Kafka,消费者可以订阅topic,然后对数据进行计算。
与传统的编程模型和多线程编程模型相比:
Map/Reduce编程模型
最后一种模型就是Map/Reduce编程模型,它的思想是移动运算,数据是庞大的,而程序要比数据小得多,将数据输入给程序是不划算的,那么就反其道而行之,将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。具体方案如下:
它与其他编程模型相比:
接下来我们就一步步地去学习如何使用Map-Reduce编程模型
Map/Reduce概述
Hadoop Map/Reduce是一个使用简易的软件框架,一个Map/Reduce 作业(job)通过实现合适的接口或抽象类提供map和reduce函数,再加上其他作业的参数,构成job configuration,然后把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序,然后把结果输入给reduce任务。
Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。
Hadoop的job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
WordCount简单案例
public class WordCountMapper extends Mapper {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
public class WordCountReduce extends Reducer {
private int sum;
private IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key, v);
}
}
public class WordCountDriver {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置 jar 加载路径
job.setJarByClass(WordCountDriver.class);
// 3 设置 map 和 reduce 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// 4 设置 map 输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
WordCount流程分析
-
map()函数以key/value作为输入,经过map函数的处理后,产生另一系列key/value作为中间输出写入到本地磁盘,以上图为例:
- 原始字符串为
Hello World
和Bye World
- 经过map()处理后得到
(Hello, 1), (World, 1), (Bye, 1), (World, 1)
- Map/Reduce框架会将中间结果写到本地磁盘。
- 原始字符串为
-
MapReduce框架会将这些中间数据按照Key值进行聚合,且Key值相同的数据被统一交给reduce()处理。reduce()函数以key以及对应的value列表作为输入,以上图为例:
- MapReduce框架会将map()产生的中间数据,从磁盘读取出来,并进行聚合,聚合策略为group by sorting,得到
(Hello,1,0), (World, 1, 1), (Bye, 1, 0)
- 经过reduce()处理,输出
(Hello, 1), (World, 2), (Bye, 1)
- MapReduce框架会将map()产生的中间数据,从磁盘读取出来,并进行聚合,聚合策略为group by sorting,得到
在上述流程分析中,实际上还忽略了几点内容:
数据切分
数据切分的过程就是确定Map Task数量的过程,Map Reduce框架切分逻辑如下:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
公式:
- splitSize = max{minSize, min{maxSize, blockSize}}
- minSize = max{"mapreduce.input.fileinputformat.split.minsize", 1}
- maxSize = "mapreduce.input.fileinputformat.split.maxsize"
对应的代码如下:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
protected long getFormatMinSplitSize() {
return 1;
}
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
一旦确定splitSize值后,将文件依次切成大小为splitSize的InputSplit,最后剩下不足splitSize的数据块单独成为一个InputSplit。
整个split逻辑如下:摘录自 org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits
方法。
public List getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
// 获取minSiez
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 获取maxSize
long maxSize = getMaxSplitSize(job);
// generate splits
List splits = new ArrayList();
List files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
// 获取blockSize
long blockSize = file.getBlockSize();
// 计算splitSize
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// 将文件依次切成大小为splitSize的InputSplit
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
// 剩下不足splitSize的数据块单独成为一个InputSplit
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
如何选择节点
从上述数据切分的代码里可以看到,InputSplit对象包含四个属性,分别是文件名,起始位置,Split长度和节点列表。Hadoop将数据本地性按照代价划分为三个等级:Node、Rack和Any。任务本地性指的是先让资源空闲的节点处理本节点的数据,如果节点上没有可处理的数据,则处理同一台机架上其他节点的数据,最差情况就是处理其他机架上的数据。
在上篇HDFS的组成和原理一文中,讲解了Hadoop的机架感知原理,实际上也同样适用于Map/Reduce框架,还是就近原则分配执行节点。
Sort
MapReduce的Sort分为两种:
-
Map Task中Spilt数据的排序
- 数据写入本地磁盘之前,先要对数据进行一次本地排序
- 快排算法
- 排序先按照分区编号partition进行排序,然后按照key进行排序。经过排序后,数据以分区为单位聚集在一起, 且同一分区内所有数据按照key有序
-
ReduceTask中数据排序
- Reduce Task对所有数据进行排序
- 归并排序算法
- 小顶堆
- Sort和Reduce可并行进行
Shuffle
在map输出与reduce输入之间,MapReduce计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫shuffle。那到底什么是shuffle?shuffle的具体过程又是怎样的呢?请看下图。
每个Map Task在写入本地磁盘前会进行partition排序,保证partition内的key相同且有序,对Map产生的每个进行Reduce分区选择,然后通过HTTP通信发送给对应的Reduce进程。这样不管Map位于哪个服务器节点,相同的Key一定会被发送给相同的Reduce进程。
Reduce任务进程对收到的进行排序和合并,相同的Key放在一起,组成一个传递给Reduce执行。这样的过程就是Shuffle的过程。
MapReduce架构
接下来我们来看一下Map/Reduce的基本架构,同HDFS一样,也是需要分为1.0架构和2.0架构
MapReduce1.x架构
在MapReduce1.0架构中,主要有三种角色:
- Job Tracker
-
- 负责集群资源监控和作业调度
- 通过心跳监控所有TaskTracker的健康状况
- 监控Job的运行情况、执行进度、资源使用,交由任务调度器负责资源分配
- 任务调度器可插拔:FIFO Scheduler、Capacity Scheduler
- Task Tracker
-
- 具体执行Task的单元
- 以slot为单位等量划分本节点的资源,分为Map Slot和Reduce Slot
- 通过心跳周期性向JobTracker汇报本节点的资源使用情况和任务运行进度
- 接收JobTracker的命令执行相应的操作(启动新任务、杀死任务等)
- 通常与HDFS的DataNode节点分配在同一个节点。
- Client
-
- 提交用户编写的程序到集群
- 查看Job运行状态
与HDFS一样大致也是两种角色,NameNode和DataNode
Spark也是两种角色,Driver和Executor
Flink也是两种角色,JobManager和TaskManager
这些大数据框架,不管是存储还是计算,基本都是Master/Worker架构,Master一个负责元数据的管理,监控管理等,Worker负责处理存储或者计算。
MapReduce运行原理
JobTracker
作业控制
JobTracker在其内部以“三层多叉树”的方式描述和跟踪每个作业的运行状态
资源管理
JobTracker不断接收各个TaskTracker周期性发送过来的资源量和任务状态等信息,为 TaskTracker分配最合适的任务。
Hadoop引入了“slot”概念表示各个节点上的计算资源。为了简化资源管理,Hadoop将各个节 点上的资源(CPU、内存和磁盘等)等量切分成若干份,每一份用一个slot表示,同时规定一个Task 可根据实际需要占用多个slot。
三级调度模型:
容错机制
作业恢复的机制处理比较简单。每个新的作业(Job)会在JobTracker的工作目录下为该作业 创建一个以该作业的JobId为命名的目录,目录底下放该作业的Job-info和JobToken文件 。如果该作业成功运行结束,那么就会在作业的Cleanup工作中删除掉该文件夹。
所以,当某个时刻JobTracker如果突然因为故障重启了,那么该工作目录下如果JobId工作 目录,就说明重启之前还有作业未运行结束(因为运行结束的Job都会把自己的目录清除掉 ),此时就会把目录中包含的作业重新提交运行,并且JobTracker会把这些重新提交运行 的Job的Id信息通过心跳信息的回复告知TaskTracker。
那些之前就已经运行在TaskTracker上的任务就是根据TaskID和JobID来更新JobTracker中 的作业和任务的信息状态的。原本就正在运行的任务仍然能够正常的更新JobTracker。已 经运行结束的Task会把新提交的作业的Task直接更新为运行结束。
TaskTracker
心跳机制
心跳是JobTracker和TaskTracker的桥梁,它实际上是一个RPC函数,TaskTracker周 期性的调用该函数汇报节点和任务状态信息,从而形成心跳。在Hadoop中,心跳主要有三个作用:
JobTracker与TaskTracker之间采用了Pull而不是Push模型,是JobTracker不会主动向TaskTracker发送任何信息,而是由TaskTracker主动通过周期性的调用RPC函数发送心跳领取属于自己的消息,JobTracker只能通过心跳应答的形式为各个TaskTracker分配任务。
容错机制
如果一个TaskTracker故障了,那我们把该TaskTracker上所有满足以下两个条件的任务杀 掉,并将它们重新加入任务等待队列中,以便被调度到其他健康节点上重新运行。
条件1 Task所属Job处于运行或者等待状态。
条件2 未运行完成的Task或者Reduce Task数目不为零的作业中已运行完成的Map Task 。
所有运行完成的Reduce Task和无Reduce Task的Job中已运行完成的Map Task无须重新 运行,因为它们将结果直接写入HDFS中。
MapReduce2.x架构
Hadoop在2.x时代,引入了YARN,HDFS HA,Federation。
MapReduce1.x架构,JobTracker包含了作业控制和资源管理两个管理,2.0架构中,就将这两个功能模块移交给了YARN。
实际上,YARN是一个资源统一管理平台。它的目标已经不再局限于支持MapReduce一种计算框架,而是朝着对多种框架进行统一管理的方向发展。例如Flink,Spark等计算框架都可以运行在Yarn。关于Yarn我们在下一篇文章中进行学习。