Lab的要求:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
使用git来获取代码git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
给的代码在src/main/mrsequential.go
中提供了串行mapreduce的实现,mrapps/wc.go
中给了统计单词的实现。可以通过如下方式使用
1 | cd ~/6.824 |
要做的工作
需要做的任务是实现一个分布式的MapReduce,包括两个程序,master和worker。worker通过RPC与master交互。每个worker进程向master索取任务,读取输入数据,进行计算,然后将结果写到文件。master应该有判断worker在一定时间内是否完成工作的能力(这个lab中为10秒),如果没有完成则分配任务至其他worker
main/mrmaster.go
和 main/mrworker.go
中提供了一些初始代码,不要修改。自己的实现写在mr/master.go
, mr/worker.go
, 和 mr/rpc.go
中。
程序应该按如下方式运行:
先编译wr.go:go build -buildmode=plugin ../mrapps/wc.go
在main目录中,启动master,pg-*.txt
是输入的文件,每个文件对应一个split,被分给一个Map任务1
2 rm mr-out*
go run mrmaster.go pg-*.txt
在其他窗口中,运行worker$ go run mrworker.go wc.so
当workers和master都完成时,查看mr-out-*
,排序后结果应该和顺序执行相同。
main/test-mr.sh
为用于测试的脚本
忽略可能遇到的Go RPC包的如下错误1
2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three
一些规则:
- map阶段应该分割中间key至
nReduce
个reduce任务,nReduce
是main/mrmaster.go
中的参数,且传递给MakeMaster()
- worker应该将第x个reduce的输出保存至
mr-out-X
文件中 mr-out-X
文件中,每个reduce函数的输出为一行,应该是Go语言"%v %v"
格式- worker应该将map的中间输出保存在当前目录中,之后读取为reduce的输入
main/mrmaster.go
期望mr/master.go
实现一个Done()
函数,在MapReduce任务完成后返回true,然后mrmaster.go
会退出- 当任务完成时,worker退出,一个简单的实现是使用
call()
的返回值,如果worker连接master失败,可以认为是任务完成,master已退出,因此worker也可以终止。取决于设计,也可以让master给worker一个“请退出”的伪任务。
提示
- 一个开始的办法是修改
mr/worker.go
中Worker()
方法,向master发送RPC请求任务。然后修改master,回复一个还未开始的map任务的文件名,然后修改worker读取该文件然后调用Map方法。可以参考mrsequential.go
- 合理的命名中间文件的方法为
mr-X-Y
,这里X是Map的编号,Y是reduce的编号 - map需要保存中间kv对至文件,然后在reduce时读取,一个可行的办法是使用
encoding/json
包。1
2
3enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)1
2
3
4
5
6
7
8dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
} - map的部分可以用
worker.go
中的ihash(key)
方法选择对应的reduce任务 - 可以从
mrsequential.go
中拷贝部分代码 - 不要忘了在master中对共享数据加锁
- 使用
go build -race
和go run -race
来启用go的竞态检测器,test-mr.sh
中有相关的注释 - worker可能需要等待,比如reduce无法启动,直至map完成。一个办法是worker周期性询问master,使用
time.Sleep()
睡眠。另一个办法是在master中使用RPC handler,使用time.Sleep()
或sync.Cond
循环等待。Go会在新线程中处理RPC。 - master无法区分worker是崩溃了还是停滞了还是执行过慢了,只能等待10秒,未完成则假设它失败了,重新调度任务至另一个worker。
- 测试崩溃恢复,可以使用
mrapps/crash.go
,随机在map和reduce中exit - 可以使用
ioutil.TempFile
创建一个临时文件然后调用os.Rename
原子性的修改文件名 - 在
mr-tmp
中查看中间信息或输出结果
实现
因为题目中说了不要公开代码,所以简单的说说思路吧
- MapReduce论文中说的是实现Map和Reduce函数,就能方便的使用集群进行计算。但是这个Lab是给了Map和Reduce函数,实现MapReduce的调度。
- 主要的几个文件如下:
mrmaster.go
用于启动master,循环调用done()判断任务是否完成。mrworker.go
用于启动worker,将Map和Reduce的实现传递给worker。这两个文件不需要改动。 - 2020 的lab1与往年相比更自由了,因此可以按照自己的想法写,但是也容易让人无从下手。可以按照题目中的提示,修改
mr/worker.go
中Worker()
方法,向master发送RPC请求任务。然后修改master,回复一个还未开始的map任务的文件名,然后修改worker读取该文件然后调用Map方法。 - master只有一个实例,需要维护任务的各种信息,在worker请求任务时进行分配。每个任务有每个任务的数据结构
- 在我的实现里,一个任务有 编号、阶段(MAP、REDUCE、WAIT、EXIT)、文件名的list、状态(NOTASSIGNED、ASSIGNED、FINISHED)、被分配的时间、reduce任务的数量
- 一个master,维护reduce任务数、map任务的list、reduce任务的list、一个互斥锁。启动master时根据传进的文件名初始化maplist,一个map任务处理一个文件。初始化nreduce个reduce任务加入reducelist
- worker请求任务时,遍历maplist,选择一个NOTASSIGNED的任务或者已经ASSIGN但是距离上次分配时间超过10秒的任务分配出去,并且设置分配时间。然后把这个任务加入list尾部,这样下一个worker来的时候就不会重复扫描。
- worker接收到任务,打开文件,调用map方法,根据key输出到R个文件中。先输出至临时文件最后再重命名,保证在crash的时候文件的拥有合法文件名的内容是正确的。文件名的格式为mr-x-y,x为map任务编号,y为
hash(key)%nreduce
后的值。最后把这R个文件名传回给master - master接收到map任务完成的消息,根据文件名中的y,把文件名加入到对应编号的reduce的文件名列表
- 反复,直到maplist中任务全都FINISHED
- worker继续请求任务,扫描maplist列表,判断是否全部FINISHED,否则返回一个WAIT任务,crash的worker执行的任务会超时然后重新分配,因此最后会全都finish
- master分配reduce任务,逻辑上与分配map任务差不多
- worker执行reduce任务,打开文件名列表中的所有文件,收集kv对。然后进行排序,输出至mr-out-y文件,y为reduce的编号
- 当maplist和reducelist中的任务都为finished时,master的
done()
函数返回true,master退出。因为done()
函数是间隔执行的,在此期间仍可能有worker请求任务,此时返回exit任务,告诉worker可以退出 - 中间文件共有$M*R$个,最后的输出有$R$个
- 注意对master中的数据结构加互斥锁