MapReduce
MapReduce
MapReduce数据处理分为Split、Map、Shuffle和Reduce 4个步骤。应用程序实现Map和Reduce步骤的逻辑,Split和Shuffle步骤由框架自动完成。
一、 处理步骤
1.1 Split步骤
在执行MapReduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(键/值对),map会依次处理每一个记录。引入split的概念是为了解决记录溢出问题。假设一个map任务处理一个块中的所有记录,那么当一个记录跨越了块边界时怎么办呢?HDFS的块大小是严格的64MB(默认值,当然也可能是配置的其他值),而且HDFS并不关心文件块中存储的内容是什么,因此HDFS无法评估何时一个记录跨越了多个块。
为了解决此问题,Hadoop使用了一种数据块的逻辑表示,叫做input splits。当MapReduce作业客户端计算input splits时,它会计算出块中第一个和最后一个完整记录的位置。如果最后一个记录是不完整的,input split中包含下一个块的位置信息,还有完整记录所需的字节偏移量。
MapReduce数据处理是由input splits概念驱动的。为特定应用计算出的input splits数量决定了mapper任务的数量。ResourceManager尽可能把每个map任务分配到存储input split的从节点上,以此来保证input splits被本地处理。
1.2 Map步骤
一个MapReduce应用逐一处理input splits中的每一条记录。input splits在上一步骤被计算完成之后,map任务便开始处理它们,此时Resource Manager的调度器会给map任务分配它们处理数据所需的资源。
对于文本文件,默认为文件里的每一行是一条记录,一行的内容是键/值对中的值,从split的起始位置到每行的字节偏移量,是键/值对中的键。之所以不用行号当作键,是因为当一个大的文本文件被分成了许多数据块,当作很多splits处理时,行号的概念本身就是存在风险的。每个split中的行数不同,因此在处理一个split之前就计算出行数并不容易。但字节偏移量是精确的,因为每个数据块都有相同的固定的字节数。
map任务处理每一个记录时,会生成一个新的中间键/值对,这个键和值可能与输入对完全不同。map任务的输出就是这些中间键/值对的全部集合。为每个map任务生成最终的输出文件前,先会依据键进行分区,以便将同一分组的数据交给同一个reduce任务处理。在非常简单的应用场景下,可能只有一个reduce任务,此时map任务的所有输出都会被写入一个文件。但是在有多个reduce任务的情况下,每个map任务会基于分区键生成多个输出文件。框架默认的分区函数(HashPartitioner)满足大多数情况,但有时也需要定制自己的partitioner,例如需要对mapper的结果集进行二次排序时。
在应用程序中最好对map任务的输出文件进行压缩以获得更优的性能。
1.3 Shuffle步骤
Map步骤之后,开始Reduce处理之前,还有一个重要的步骤叫做Shuffle。MapReduce保证每个reduce任务的输入都是按照键排好序的。系统对map任务的输出执行排序和转换,并映射为reduce任务的输入,此过程就是Shuffle,它是MapReduce的核心处理过程。在Shuffle中,会把map任务输出的一组无规则的数据尽量转换成一组具有一定规则的数据,然后把数据传递给reduce任务运行的节点。Shuffle横跨Map端和Reduce端,在Map端包括spill过程,在Reduce端包括copy和sort过程,如图下图所示。
需要注意的是,只有当所有的map任务都结束时,reduce任务才会开始处理。如果一个map任务运行在一个配置比较差的从节点上,它的滞后会影响MapReduce作业的性能。为了避免这种情况的发生,MapReduce框架使用了一种叫做推测执行的方法。所谓的推测执行,就是当所有task都开始运行之后,MRAppMaster会统计所有任务的平均进度,如果某个task所在的task node因为硬件配置比较低或者CPU load很高等原因,导致任务执行比总体任务的平均执行慢,此时MRAppMaster会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill。另外,根据mapreduce job幂等的特点,同一个task执行多次的结果是一样的,所以task只要有一次执行成功,job就是成功的,被kill的task对job的结果没有影响。如果你监测到任务执行成功,但是总有些任务被kill,或者map任务的数量比预期的多,可能就是此原因所在。
map任务的输出不写到HDFS,而是写入map任务所在从节点的本地磁盘,这个中间结果也不会在Hadoop集群间进行复制。
1.4 Reduce步骤
Reduce步骤负责数据的计算归并,它处理Shuffle后的每个键及其对应值的列表,并将一系列键/值对返回给客户端应用。有些情况下只需要Map步骤的处理就可以为应用生成输出结果,这时就没有Reduce步骤。例如,将全部文本转换成大写这种基本的转化操作,或者从视频文件中抽取关键帧等。这些数据处理只要Map阶段就够了,因此又叫map-only作业。但在大多数情况下,到map任务输出结果只完成了一部分工作。剩下的任务是对所有中间结果进行归并、聚合等操作,最终生成一个汇总的结果。
与map任务类似,reduce任务也是逐条处理每一个键。通常reduce为每个处理的键返回单一键/值对,但这个结果键/值对可能会比原始输入的键/值对小得多。当reduce任务完成后,每个reduce任务的输出会写入一个结果文件,并将结果文件存储到HDFS中,HDFS会自动生成结果文件数据块的副本。
Resource Manager会尽量给map任务分配资源,确保input splits被本地处理,但这个策略不适用于reduce任务。Resource Manager假定map的结果集需要通过网络传输给reduce任务进行处理。这样实现的原因是,要对成百上千的map任务输出进行Shuffle,没有切实可行的方法为reduce实施相同的本地优先策略。
二、MapReduce常用调优参数
2.1 Mapping
- 减少溢写的次数
mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100m,可以提高到200m
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80% ,可以提高的90%
- 增加每次Merge合并次数
mapreduce.task.io.sort.factor默认10,可以提高到20
- mapreduce.map.memory.mb
默认MapTask内存上限1024MB。 可以根据128m数据对应1G内存原则提高该内存。如果文件快是压缩的就超过128m了
控制MapTask堆内存大小
mapreduce.map.java.opts (如果内存不够, 报:java.lang.OutOfMemoryError)
mapreduce.map.cpu.vcores 默认MapTask的CPU核数1。计算密集型任 务可以增加CPU核数
异常重试 mapreduce.map.maxattempts每个Map Task最大重试次数,一旦重试 次数超过该值,则认为Map Task运行失败,默认值:4。根据机器 性能适当提高。
2.2 Reduce
- mapreduce.reduce.shuffle.parallelcopies每个Reduce去Map 中拉取数据的并行数默认值是5。可以提高到10。
- mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例,默认值0.7。可以提高到0.8\
- mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例 开始写入磁盘,默认值0.66。可以提高到0.75
- mapreduce.reduce.memory.mb 默认ReduceTask内存上限1024MB, 根据128m数据对应1G内存原则,适当提高内存到4-6G
- mapreduce.reduce.java.opts:控制ReduceTask堆内存大小。(如果内 存不够,报:java.lang.OutOfMemoryError)
- mapreduce.reduce.cpu.vcores默认ReduceTask的CPU核数1个。可 以提高到2-4个
- mapreduce.reduce.maxattempts每个Reduce Task最大重试次数, 一旦重试次数超过该值,则认为Map Task运行失败,默认值:4。
- mapreduce.job.reduce.slowstart.completedmaps当MapTask完成的比 例达到该值后才会为ReduceTask申请资源
- mapreduce.task.timeout如果一个Task在一定时间内没有任何进入, 即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态, 可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住 不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000 (10分钟)。如果你的程序对每条输入数据的处理时间过长,建议将 该参数调大。