基础
MapReduce
是一种用于数据处理的编程模型,其本质是并行运行,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心,当然其优势也是处理大规模数据集。
MapReduce
任务过程分为两个处理阶段: map
阶段和 reduce
阶段。每个阶段都是以键值对作为输入和输出,其类型由开发者决定,当然 map
函数和 reduce
函数也是由开发者实现。
MapReduce
原理
MapReduce
作业(job
)是客户端执行工作的单元,包含:输入数据、 MapReduce
程序和配置信息。Hadoop
将作业分成若干个任务(task
)来执行,其中包含两类任务: map
任务和 reduce
任务,这些任务运行在集群的节点上,并通过 YARN
进行调度,其中如果有一个任务失败,则将在另一个不同的节点上重新调度运行。
MapReduce
的输入数据会被划分为等长的小数据块,称为输入分片(input split
)或者简称分片,而每一个分片会构建一个 map
任务,并由该任务来运行用户自定义的 map
函数从而处理分片中的每条记录。
有许多分片就意味着每个分片处理所需时间少于整个输入数据所需时间;另外如果分片切分得太小,那管理分片得事件和构建 map
任务的事件将组成作业的整体运行事件。
如果 map
任务运行在存储有输入数据(HDFS
中的数据)的节点上,无需使用带宽资源即可获得最佳性能,也就是数据本地化优化(data locality optimization
) 。
map
任务将其输出写入本地磁盘(此输出为中间结果),该中间结果由 reduce
任务处理后才会产生最终输出结果,随着作业完成,map
的结果也会被随之删除。
reduce
任务的输入通常来自于所有 map
任务的输出,其并不具备数据本地化的优势,且 reduce
任务的数量是独立指定的不由输入数据的大小决定。如果存在多个 reduce
任务,那每个 map
任务就会针对输出进行分区,即为每个 reduce
任务创建一个分区,每个分区都有许多键及其对应的值,但是每个键对应的键值对记录都在同一分区中。
分区可以由用户定义的分区函数控制,但通常采用默认的 partitioner
通过哈希函数来区分。
map
任务和 reduce
任务之间的排序和分组,该部分也被称为 shuffle
(混洗) ,每个 reduce
任务的输入都来自于所有 map
任务的输出,另外 shuffle
一般比图中的更加复杂,而且调整混洗参数对作业总执行时间的影响非常大。
当数据完全并行时(即无需混洗)可能会出现无 reduce
任务的情况。
总结 map
阶段的输入是 NCDC
原始数据,键是某一行起始位置相对于文件起始位置的偏移量,而值是文本文件的每一行;reduce
阶段的输入则是将 map
阶段的输出结果经由 shuffle
排序和分组之后的数据。
combiner
函数
集群上可用的带宽限制了 MapReduce
作业的数量,因此尽量避免 map
任务和 reduce
任务之间的数据传输是有利的;为此 Hadoop
允许用户指对 map
任务的输出指定一个 combiner
,该 combiner
函数的输出将作为 reduce
任务的输入。
由于 combiner
属于优化方案,因此无法确定要对一个指定的 map
任务输出记录调用多少次 combiner
。
经典示例
WordCount
单词计数是最简单也是最能体现 MapReduce
思想的示例程序之一,该程序完整的代码可以在 Hadoop
安装包的 src/examples
目录下找到。其主要功能是:统计一系列文本文件中每个单词出现的次数。
1 | import java.io.IOException; |
Job
初始化main
函数构建Configuration
对象设置系统配置信息,接着Job
自定义实例并设置启动类。设置
Job
的map
(拆分)、combiner
(中间结果处理)、reduce
(合并)
设置Job
的相关map
类、reduce
类、combiner
类。map
:Mapper
类共有四个泛型,分别是KEYIN
、VALUEIN
、KEYOUT
、VALUEOUT
,前面两个KEYIN
、VALUEIN
指的是map
函数输入参数的键值对的类型;后面两个KEYOUT
、VALUEOUT
指的是map
函数输出的键值对的类型。而这里的map
函数中通过空格符号来分割文本内容,并对其进行记录。reduce
:Reducer
类也有四个泛型,分别指的是reduce
函数输入的键值对类型(这里输入的键值对类型通常和map
的输出键值对类型保持一致)和输出的键值对类型。而这里的reduce
函数主要是将传入的键值对进行最后的合并统计,形成最后的统计结果。
设置
Job
的键值对类型
设置Job
输出结果<key,value>
的中键值对数据类型,因为结果是<单词,个数>,所以key
设置为Text
类型相当于String
类型。Value
设置为IntWritable
相当于int
类型。设置
Job
的输入输出
通过setInputPath
和setOutputPath
设置Job
的输入输出路径。
工作机制
作业运行机制
在说明作业运行机制之前,可以通过 Job
对象的 submit()
方法来运行 MapReduce
作业,且其内部封装了大量的处理细节;也可以通过调用 waitForComplete()
方法用来提交之前没有提交过的任务,并等待它完成。
整体机制流程如下:
- 客户端提交
MapReduce
作业。 YARN
资源管理器负责协调集群上计算机资源的分配。YARN
节点管理器负责启动和监视集群中机器上的计算容器(container
)。MapReduce
的application master
负责协调运行MapReduce
作业的任务。它和MapReduce
任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理。- 分布式文件系统用来与其他实体间共享作业文件。
提交作业
Job
的 submit()
方法创建一个内部的 JobSummiter
实例,并且调用其 submitJoobbInternal()
方法。提交作业后,waitForComplete()
每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功就显示作业计数器;如果失败则导致作业失败的错误被记录到控制台。JobSummiter
所实现的作业提交过程如下:
- 向资源管理器请求一个新应用
ID
,用于MapReduce
作业ID
。 - 检查作业的输出说明。
- 计算作业的输入分片。
- 将运行作业所需要的资源(包括作业
JAR
文件、配置文件和计算所得的输入分片)复制到一个以作业ID
命名的目录下的共享文件系统中。作业JAR
的副本较多(由mapreduce.client.submit.file.replication
属性控制,默认值为10
),因此在运行作业的任务时,集群中有很多副本可供节点管理器访问。 - 通过调用资源管理器的
submitApplication()
方法提交作业。
作业初始化
资源管理器收到调用它的 submitApplication()
消息后,便将请求传递给 YARN
调度器(scheduler
) 。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动 application master
的进程。
MapReduce
作业的 application master
是一个 Java
应用程序,它的主类是 MRAppMaster
。由于将接受来自任务的进度和完成报告,因此 application master
对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪来完成的。接下来它接受来自共享文件系统、在客户端计算的输入分片,接着对每一个分片创建一个 map
任务对象以及由 mapreduce.job.reduces
属性(通过作业的 setNumReduceTasks()
方法设置)确定的多个 reduce
任务对象。任务 ID
也会在此时分配。
application master
必须决定如何运行构成 MapReduce
作业的各个任务。如果作业很小,并且 application master
判断在新的容器中分配和运行任务的开销大于并行运行的开销时,就选择和自己在同一个 JVM
上运行任务。这样的作业被称为 uberized
,或者作为 uber
任务运行 。
如何判断作何很小呢?默认情况下,小作业就是少于 10
个 mapper
且只有 1
个 reduce
且输入大小小于一个 HDFS
块的作业(设置 mapreduce.job.ubertask.maxmaps
、mapreduce.job.ubertask.maxreduces
和 mapreduce.job.ubertask.maxbytes
参数)。必须明确启用 Uber
任务(对于单个作业或者整个集群),具体方法是将 mapreduce.job.ubertask.enable
设置为 true
。
最后在运行任务前,application master
调用 setupJob()
方法设置 OutputCommitter
。默认值为 FileOutputCommiter
,表示将建立作业的最终输出目录及任务输出的临时工作空间。
分配任务
如果作业不适合作为 uber
任务运行,那么 application master
就会为该作业中的所有 map
任务和 reduce
任务向资源管理器请求容器。首先为 map
任务发出请求,该请求优先级要高于 reduce
任务的请求,这是因为所有的 map
任务必须在 reduce
的排序阶段能够启动前完成,直到有 5%
的 map
任务已经完成时为 reduce
任务的请求才会发出。
reduce
任务能够在集群中任意位置运行,但是 map
任务的请求有着数据本地化局限,在理想的情况下,任务是数据本地化(data local
)的,意味着任务在分片驻留的同一节点上运行。可选的情况下,任务可能时机架本地化(rack local
)的,即和分片在同一机架而非同一节点上运行。同时也有一些任务既不是数据本地化,也不是机架本地化,它们会从别的机架上获取自己的数据。
请求也为任务指定了内存需求和 CPU
数。在默认情况下,每个 map
任务和 reduce
任务都分配到 1024MB
的内存和一个虚拟内核,这些值可以在每个作业的基础上进行配置,分别通过 4
个属性来设置 mapreduce.map.memory.mb
、mapreduce.map.cpu.vcores
、mapreduce.reduce.memory.mb
和 mapreduce.reduce.cpu.vcoresp.memory.mb
。
执行任务
一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master
就通过与节点管理器通信来启动容器。该任务由主类为 YarnChild
的一个 Java
应用程序执行,在它运行任务之前,需要首先将任务需要的资源本地化,包括作业的配置、JAR
文件和所有来自分布式缓存的文件。最后运行 map
任务或 reduce
任务。
YarnChild
在指定的 JVM
中运行,因此用户定义的 map
或 reduce
函数中的任何缺陷不会影响到节点管理器。
每个任务都能够执行搭建(setup
)和提交(commit
)动作,它们和任务本身在同一个 JVM
中运行,并由作业的 OutputCommitter
确定。对于基于文件的作业,提交动作将任务输出由临时位置迁移到最终位置。提交协议确保当推测执行(speculative execution
)被启用时,只有一个任务副本被提交,其他的都被取消。
更新作业状态和进度
MapReduce
作业是长时间运行的批量作业,运行时间范围从数秒到数小时。一个作业和它的每个任务都有一个状态(status
),包括:作业或任务的状态、map
和 reduce
的进度、作业计数器的值、状态消息或描述(可以由用户来设置)。
上述的状态信息在作业期间不断改变,那又是如何与客户端通信呢?任务在运行时,对其进度(progress
,即任务完成百分比)保持跟踪。对 map
任务,任务进度是已处理输入所占比例。对 reduce
任务,情况有点复杂,整个过程分为三个步骤,与 shuffle
的三个阶段相对应,同时也会估计已处理 reduce
输入的比例。
MapReduce
中进度的组成如下:
- 读入一条输入记录
- 写入一条输出记录
- 设置状态描述
- 增加计数器的值
- 调用
Reporter
或TaskAttemptContext
的progress()
方法
任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么置于框架中,要么由用户自己定义。
当 map
任务或 reduce
任务运行时,子进程和自己的父 application master
通过 umbilical
接口通信。每个三秒钟,任务通过这个 umbilical
接口向自己的 application master
报告进度和状态(包括计数器),application master
会形成一个作业的汇聚视图(aggregate view
)。
在作业期间,客户端每秒钟轮询一次 application master
以接收最新状态(轮询间隔通过 mapreduce.client.progressmonitor.pollinterval
参数设置)。客户端也可以使用 Job
的 getStatus()
方法得到一个 JobStatus
对象的实例,后者会包含作业的所有状态信息。
作业完成
当 application master
收到作业最后一个任务已完成的通知后,便将作业的状态设置为“成功”。在 Job
轮询状态时,便知道任务已成功完成,于是 Job
打印一条消息告知用户,然后从 waitForComplete()
方法返回;Job
的统计信息和计数值在这个时候也会输出到控制台。
如果 application master
有相应的设置,也会发送一条 HTTP
作业通知,可以通过 mapreduce.job.end-notification.url
属性来设置在收到回调指令后通知客户端。
最后在作业完成时,application master
和任务容器清理其工作状态(中间状态会被删除),接着 OutputCommitter
的 commitJob()
方法会被调用。作业信息由作业历史服务器存档,以便日后用户需要时可以查询。
shuffle
MapReduce
确保每个 reduce
的输入都是按键排序的,系统执行排序将 map
输出作为输入传给 reducer
的过程称为 shuffle
。shuffle
属于不断被优化和改进的部分,也是 MapReduce
的“心脏”,是奇迹发生的地方。
map
map
函数开始产生输出时,并不是简单地将数据写入磁盘,而是利用缓冲的方式写到内存并处于效率的考虑进行预排序。
每个 map
任务都有一个环形内存缓冲区用于存储任务输出,一旦缓冲内容达到阈值(默认为 80%
),此时一个后台线程便开始把内容溢出到磁盘。在溢出写到磁盘的过程中,map
输出继续写到缓冲区,但如果在此期间缓冲区被填满,map
会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区内容写到 mapreduce.cluster.local.dir
属性在作业特定子目录下指定的目录中。
默认情况下,缓冲区大小为 100MB
,此值可以通过 mapreduce.task.io.sort.mb
属性调整
如果 map
输出相当小,则输出会被直接复制到 reduce
任务的 JVM
内存中。指定用于此用途的堆空间的百分比大小可以由 mapreduce.reduce.shuffle.input.buffer.percent
属性控制,。
在写磁盘之前后台线程会根据数据最终要传递给的 reducer
将数据划分为相应的分区。在每个分区中后台线程会根据键进行排序,如果此时有一个 combiner
函数,在排序后运行此函数,此后会减少写到磁盘上的数据和传递给 reducer
的数据。
每次内存缓冲区达到溢出阈值,就会创建一个溢出文件(spill file
),因此在 map
任务写完其最后一个输出记录之后就会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
属性 mapreduce.task.io.sort.factor
可以控制一次最多合并多少溢出文件,默认值为 10
。如果最少存在 3
个溢出文件时,则 combiner
函数会在输出文件写到磁盘之前再次运行
在将压缩 map
输出写到磁盘的过程中是否能使用压缩呢?在默认情况下,输出是不压缩的,但可以通过属性 mapreduce.map.output.compress
设置为 true
,即可启用此功能。而使用的压缩库则由 mapreduce.map.output.compress.codec
参数指定。
reducer
通过 HTTP
得到输出文件的分区。而用于文件分区的工作线程的数量由任务的 mapreduce.shuffle.max.threads
属性控制,此设置针对的是每一个节点管理器,而不是每个 map
任务。
reduce
map
输出文件位于运行 map
任务的 tasktracker
的本地磁盘,之后 tasktracker
会为分区文件运行 reduce
任务。那 reducer
如何知道从那台机器上获取 map
输出?map
任务成功后,会通过心跳机制通知他们的 application master
,reducer
中的一个线程定期询问 master
以便获取 map
输出文件主机的位置,直到获取所有输出位置。
每个 map
任务的完成不尽相同,因此每个任务完成时,reduce
任务就开始复制其输出(此为 reduce
任务的复制阶段)。reduce
任务有少量复制线程,可以并行获取 map
输出,默认为 5
个线程,此默认值可以通过 mapreduce.reduce.shuffle.parallelcopies
属性修改。
复制完成所有的 map
输出之后,reduce
任务进行合并阶段,此阶段会合并 map
输出,维持其顺序排序。
默认值为 10
,通过 mapreduce.task.io.sort.factor
属性控制。
在最后阶段,即直接将数据输入 reduce
函数,从而省略一次磁盘的往返行程,并没有将输出文件合并为一个已排序的文件作为最后一次。
在 reduce
阶段,对已排序输出中的每个键调用 reduce
函数,此阶段的输出直接写到输出文件系统,一般为 HDFS
。如果采用 HDFS
,由于节点管理器也运行着数据节点,则第一块数据副本将被写到本地磁盘。
配置调优
map
端的调优属性属性名称 类型 默认值 说明 mapreduce.task.io.sort.mb
int
100
排序 map
输出时所使用的内存缓冲区的大小,以MB
为单位mapreduce.map.sort.spill.percent
float
0.80
map
输出内存缓存和用来开始磁盘溢出写过程的记录边界索引,使用比例的阈值mapreduce.task.io.sort.factor
int
10
排序文件时,一次最多合并的流数。在 reduce
中使用。mapreduce.map.combine.minspills
int
3
运行 combiner
所需的最少溢出文件数mapreduce.map.output.compress
Boolean
false
是否压缩 map
输出mapreduce.map.output.compress.codec
Class name
org.apache. hadoop.io.compress. DefaultCodec
用于 map
输出的压缩编解码器mapreduce.shuffle.max.threads
int
0
每个节点管理器的工作线程数,用于将 map
输出到reducer
。0
表示使用Netty
默认值,两倍于可用的处理器数。总的原则是给
shuffle
过程尽可能多提供内存空间。但是map
函数和reduce
函数不能无限制使用内存,需要尽可能少用内存。
运行map
任务和reduce
任务的JVM
由mapred.child.java.opts
属性设置内存大小。
在map
端通过避免多次溢出写磁盘来获得最佳性能,一次最好;而在reduce
端,中间数据全部驻留在内存时就能获得最佳性能。reduce
端的调优属性属性名称 类型 默认值 描述 mapreduce.reduce.shuffle.parallelcopies
int
5
用于把 map
输出复制到reducer
的线程数mapreduce.reduce.shuffle.maxfetchfailures
int
10
在声明失败之前 reducer
获取一个map
输出所花的最大时间mapreduce.task.io.sort.factor
int
10
排序文件时一次最多合并的流的数量, mapreduce.reduce.shuffle.input.buffer.percent
float
0.70
shuffle
复制阶段分配给map
输出的缓冲区占堆空间的百分比mapreduce.reduce.shuffle.merge.percent
float
0.66
map
输出缓冲区的阈值使用比例,用于启动合并输出和磁盘溢出写的过程mapreduce.reduce.merge.inmen.threshold
int
1000
启动合并输出和磁盘溢出写过程的 map
输出的阈值数,0
或者更小的数意味者没有阈值限制mapreduce.reduce.input.buffer.percent
float
0.0
reduce
过程中在内存中保存map
输出的空间占整个堆空间的比例。
特性
计数器
计数器是收集作业统计信息的有效手段之一,用于质量控制和应用级统计,另外计数器还可以辅助诊断系统故障。
内置计数器
Hadoop
为每个作业维护若干个内置计数器,以描述多项指标。
组别 | 名称/类别 |
---|---|
MapReduce 计数器 |
org.apache.hadoop.mapreduce.TaskCounter |
文件系统计数器 | org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat 计数器 |
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat 计数器 |
org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
作业计数器 | org.apache.hadoop.mapreduce.JobCounter |
这些计数器被划分为若干组,各组要么包含任务计数器(在任务处理过程中不断更新),要么包含作业计数器(在作业处理过程中不断更新)。
Java
自定义计数器
MapReduce
允许用户编写程序来定义计数器,计数器的值可以在 mapper
或 reducer
中增加,计数器由一个 Java
枚举类型来定义,以便对有关的计数器分组。
一个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚举类型的名称即为组的名称,枚举类型的字段就是计数器名称,计数器是全局的。换言之,MapReduce
框架将跨所有 map
和 reduce
聚集这些计数器,并在作业结束时产生一个最终结果。
1 | // 定义 |
排序
排序是 MapReduce
的核心,MapReduce
使用排序来组织数据,而排序也分为不同的数据集排序方式。
部分排序
默认情况下,MapReduce
会根据输入记录的键对数据集排序。
全排序
如何生成一个全局排序的文件呢?
最简单的方法就是,首先创建一系列排序好的文件;其次串联这些文件;最后生成一个全局排序的文件。其主要思路就是使用一个 patitioner
来描述输出的全局排序。
该方法的关键点在于如何划分各个分区。在理想情况下,各分区所包含的记录数应大致相等。使作业的总体执行时间不会受制于个别 reducer
。
辅助排序
通过特定的方法对键进行排序和分组以实现对值的排序。
通过设置一个按照键进行分区的 patitioner
,这样可以确保相同 patitioner
的记录会被发送到同一个 reducer
中,而在同一个分区中,仍然可以通过键进行分组。
按值排序的方法总结:
- 定义包括自然键和自然值的组合键。
- 根据组合键对记录进行排序,即同时用自然键和自然值进行排序。
- 针对组合键进行分区和分组时均只考虑自然键。
边数据
边数据(side data
)是作业所需的额外的只读数据,以辅助处理主数据集 。其所面临的挑战在于如何使所有 map
或 reduce
任务(散布在集群内不)都能够方便而高效地使用边数据。
Hadoop
的分布式缓存机制能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用,不过为了节约网络带宽,在每一个作业中,各个文件通常只需要复制到一个节点一次。
对于使用 GenericOptionsParser
的工具来说,用户可以使用 -files
选项指定待分发的文件,文件内包含以逗号隔开的 URI
列表,如果没有指定文件系统,则这些文件会被默认为本地文件。-archives
选项可以向自己的任务中复制存档文件(JAR
文件、ZIP
文件、tar
文件和 gzipped tar
文件),这些文件会被解档到任务节点。-libjars
选项会把 JAR
文件添加到 mapper
和 reducer
任务的类路径中。
引用
个人备注
此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!