MapReduce
MapReduce是用于处理大数据的一个框架,用户通过编写map
函数,接收key/value对,输出一系列中间k/v对,redece
函数合并具有相同key的中间k/v对。
这样编写的程序可以自动的并行化,运行在普通商用计算机组成的集群上,运行时系统自动的划分输入数据,分发任务,处理错误,管理机器间通信。程序员不需要任何并行化和分布式系统的经验就可以使用大规模的分布式系统。
例子
一个统计文档中出现的单词数的例子如下(伪码)
1 | map(String key, String value): |
更一般的,mapreduce有如下形式
1 | map (k1,v1) → list(k2,v2) |
mapreduce还可以用于分布式的grep、url访问次数统计、倒排索引、分布式排序等
实现
使用的硬件:
- 双核x86处理器,运行Linux系统,2-4GB的内存。
- 商用网络硬件,在机器层面通常为100Mbps或1Gbps。
- 一个集群有上百个机器,故障非常普遍。
- 存储使用不贵的IDE硬盘,直接连接在机器上,使用GFS文件系统。
- 用户提交任务给调度系统,调度系统将细分的小任务分配给集群中可用的机器
执行过程
输入数据会被自动的划分为M片,分配到不同的机器上面Map,数据的划分也可以在不同机器上并行处理。使用Map产生的中间key,使用一个分区函数分成R片,分配至不同的机器进行Reduce。总体过程如下
- 用户程序中的MapReduce库将输入数据分成M片,每片通常16MB-64MB。然后在集群上启动许多副本
- 有一个副本比较特殊,称为master,其余副本由master分配工作。有M个map任务和R个reduce需要分配,master会自动选取空闲的worker分配任务。
- 被分配到map任务的worker读取对应的输入分片,解析k/v对并传递给用户定义的map函数,map函数生成的中间结果缓存在内存中
- 缓存的k/v对周期性的写入硬盘,通过分区函数划分为R片。结果保存的位置会传回给master,master负责把位置传递给reduce woker
- 当reduce worker被master通知文件位置时,使用RPC读取文件,当他读取了所有的中间数据,他用key对其进行排序,这样相同的key就会被合并。如果中间数据太大,使用外部排序
- reduce worker遍历排序好的中间数据,把key和对应的value set传给用户自定义的Reduce函数,Reduce函数的输出append到一个文件中
- 当所有map 和 reduce 任务都完成了,master唤醒用户程序。
结束后,mapreduce的输出是R个文件,通常不需要合并为一个文件,因为可能会继续传给其他的mapreduce或者其他可以划分数据的分布式应用使用。
master的数据结构
对于每个map任务和reduce任务,master维持他们的状态(空闲,执行中,完成)以及worker机器的身份。
master是map任务传递中间文件位置给reduce任务的管道。对于每个完成的map任务,master维护R个输出文件的位置、大小,当map任务完成时更新。这些信息会被推送给正在执行reduce任务的worker。
容错
worker failure
master周期性的ping worker,如果没有收到回应,就标记该worker 失败,该worker已完成的map任务被重置,因此可以被调度至其他worker运行。同样的,在运行的map或reduce任务也被重置。
已完成的map被再次执行,因为结果保存在本地磁盘上,如果该机器fail了,那么文件也无法访问了。已经完成的reduce任务不需要再次执行,因为结果已经保存在全局的文件系统中。
如果一个map任务由A执行后又由B执行,其他执行reduce任务的worker会得到通知,如果还没有从A读取数据,之后将会从B读取
MapReduce可以抵抗大规模的worker failures,只需要重新执行fail机器上的任务
master failure
使master周期性保存上述数据结构很容易。如果master fail了,使用检查点可以重新启动一个master的拷贝。然而,在只有一个master的情况下,fail的概率很小,因此实现时如果master fail,只是终止MapReduce,客户端可以重新运行MapReduce任务。
出现错误时的语义
当map 和 reduce 任务是确定性的时,我们的分布式实现产生与无错误顺序执行相同的结果。我们使用map和reduce原子性提交结果来实现这个性质。每个执行中的task将结果写到私有的临时文件中。reduce任务产生一个文件,map任务产生R个。当map任务完成时,worker发送一个包含R个文件名的信息给master,如果master收到已完成map任务的信息,它将忽略此信息,否则,它记录R个文件的名字。
当reduce任务完成时,reduce worker重命名它的临时文件为最终文件。如果有其他worker运行相同的reduce任务,多个重命名请求会被执行。我们利用底层文件系统的重命名原子性,确保最终文件系统状态仅包含一次执行reduce任务所产生的数据。
当map reduce操作时非确定的时,我们提供稍弱但仍合理的语义。这块暂时没懂。
局部性
网络带宽是相对比较稀有的资源。master会将map任务分配给保存了那个输入文件的机器,如果失败,尝试分配至就近的机器(例如在一个交换机上)。节省大量的网络带宽。
任务粒度
我们将map划分为M片,reduce划分为R片。理想的,M和R将远大于worker数。使每个worker执行许多不同任务提升动态负载均衡性,同样加速worker fail时的恢复。在实践中,我们通常选择M,使得每个任务处理大约16MB-64MB的数据。
备份任务
有些时候会出现straggler现象,一个机器在最后几个map或reduce任务上花费了异常大的执行时间。这可能有多个理由,例如,坏的硬盘。其他任务也可能被分配至此机器,由于cpu、内存、网络、硬盘的竞争,使得执行时间更慢了。
我们有全局的机制来缓解这个问题。当MapReduce快要完成时,master调度执行中的任务备份执行,当主或备份执行完成时,该任务完成。
细节
分区函数
通常情况下我们使用hash(key) mod R
来分区,然而在某些情况下可以使用其他分区函数。例如,我们可以使用hahs(Hostname(urlkey))mod R
来将具有相同域名的url映射到相同的文件
排序保证
我们保证在给定划分时,中间的k/v对以升序被处理。使得输出一个排序的结果文件更容易,在输出文件需要根据key随机查找时效率更高,用户可能也会觉得很方便
结合函数
在某些情况下,每个映射任务产生的中间键会大量重复。一个例子是单词统计,可能会产生许多
输入输出类型
MapReduce库提供以不同格式读取数据的支持,例如,text模式将输入每行视作k/v对,k是偏移量,v是该行的内容。用户也可以自定义reader的实现。reader不必只从文件读,也可以从数据库读,或者从内存中读
副作用
有些时候用户可能发现输出一些辅助文件很方便,但是需要用户自己来保证原子性及幂等。
跳过坏记录
有些时候用户的代码可能有bug,对于特定的records可能导致程序的崩溃。通常的做法是修复bug,然而在某些情况下却并不适用。可能bug是由于第三方库,或者有些时候也可以跳过一两条记录。worker会把错误信息发送给master,当master发现对于一条记录有多个失败时,说明该记录应该被跳过
本地运行
想debug Map或 Reduce往往比较困难。为了辅助debug、测试,MapReduce库提供了用于本地调试的选项,然后就可以使用gdb等进行调试
状态信息
master运行一个内部的HTTP服务器,提供一些信息给用户。包括进度,输入输出数据量,处理速度等,也提供指向标准错误或标准输出的连接。用户可以监控任务还需要多久,判断是否需要更多资源等。另外,最顶层的状态页展示哪些worker故障了,故障时在执行什么任务等信息
计数
MapReduce框架提供计数功能。用户可以创建一个计数器对象,在map或reduce中进行递增操作。计数器值会在master与worker心跳包之间传递。master统计成功执行的map或reduce任务的counter值,并处理重复map、reduce任务以避免两次计数。