概述
有关lab1的要求以及初始代码可以在 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 中获取。
问题与思路
首先需要理解mapreduce模型以及master和worker之间的关系,这些信息在paper中都有。
从图中可以看出master负责将map和reduce的任务分配给worker,worker对指定的文件进行操作后按照格式写入到输出文件中。
1. 从何入手
- 查看lab1的要求和提示,明白作业最终需要达成什么目的。
- 尝试理解并且运行代码
mrsequential.go,知道输出什么形式的数据。 - 利用给出的第一条提示:修改
mr/master代码,让其发送一条RPC请求给master。
2. 任务状态
为了记录map和reduce的运行阶段,需要给任务分配状态位。
1
2
3
4
5
const (
TaskInit = iota // 初始值
TaskRunning // 任务运行中
TaskDone // 任务已完成
)
3. 流程管理
需要在所有的map任务完成之后才能进行reduce任务分配。所以map任务还未完成,使用Empty参数通知给worker。
1
2
3
4
5
6
for _, v := range m.mapTask {
if v != TaskDone {
reply.Empty = true
return nil
}
}
完整示例
由于该lab还是一个作业,并且有人呼吁大家不要将代码开源放在github上,这里我只贴出部分关键代码作为参考。 若有不明白的地方,可以联系我进行解答,邮箱:baronwithyou@gmail.com。
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const (
Map = "map"
Reduce = "reduce"
)
type AssignArgs struct {}
type AssignReply struct {
Empty bool
Filename string
NReduce int
Index int
Type string
Finished bool
}
type CommitArgs struct {
Filename string
ID int
Type string
}
type CommitReply struct {}
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
const (
TaskInit = iota
TaskRunning
TaskDone
)
type Master struct {
// Your definitions here.
files []string
nReduce int
mapTask []int
reduceTask []int
lock sync.Mutex
timeout time.Duration
done bool
}
// Assign dispatch task to specific worker
func (m *Master) Assign(args *AssignArgs, reply *AssignReply) error {
m.lock.Lock()
defer m.lock.Unlock()
ctx, _ := context.WithTimeout(context.Background(), m.timeout)
if m.assignMapTask(ctx, args, reply) {
return nil
}
for _, v := range m.mapTask {
if v != TaskDone {
reply.Empty = true
return nil
}
}
if m.assignReduceTask(ctx, args, reply) {
return nil
}
for _, v := range m.reduceTask {
if v != TaskDone {
reply.Empty = true
return nil
}
}
m.done = true
reply.Finished = true
return nil
}
func (m *Master) assignMapTask(ctx context.Context, args *AssignArgs, reply *AssignReply) bool {
for id, v := range m.mapTask {
if v == TaskRunning || v == TaskDone {
continue
}
//fmt.Printf("A worker is requiring a map task\n")
reply.Filename = m.files[id]
reply.Index = id
reply.NReduce = m.nReduce
reply.Type = Map
m.mapTask[id] = TaskRunning
go func(c context.Context) {
select {
case <-ctx.Done():
m.lock.Lock()
defer m.lock.Unlock()
if m.mapTask[id] == TaskRunning {
m.mapTask[id] = TaskInit
log.Printf("%v map task timeout\n", m.files[id])
}
}
}(ctx)
return true
}
return false
}
func (m *Master) assignReduceTask(ctx context.Context, args *AssignArgs, reply *AssignReply) bool {
for id, v := range m.reduceTask {
if v == TaskRunning || v == TaskDone {
continue
}
//fmt.Printf("A worker has require a reduce task\n")
m.reduceTask[id] = TaskRunning
reply.Index = id
reply.NReduce = m.nReduce
reply.Type = Reduce
go func(c context.Context) {
select {
case <-ctx.Done():
m.lock.Lock()
defer m.lock.Unlock()
if m.reduceTask[id] == TaskRunning {
m.reduceTask[id] = TaskInit
log.Printf("%v reduce task timeout\n", id)
}
}
}(ctx)
return true
}
return false
}
// Commit ...
func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {
m.lock.Lock()
defer m.lock.Unlock()
switch args.Type {
case Map:
//fmt.Printf("%v has been mapped\n", m.files[args.ID])
m.mapTask[args.ID] = TaskDone
case Reduce:
//fmt.Printf("mr-out-%v has been reduced\n", args.ID)
m.reduceTask[args.ID] = TaskDone
}
return nil
}
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
ok, res := GetMapTask()
if !ok || res.Finished {
break
}
// Wait if master is waiting for all files to be mapped
if res.Empty {
continue
}
args := &CommitArgs{}
args.Type = res.Type
args.ID = res.Index
switch res.Type {
case Map:
if err := mapping(mapf, res.Filename, res.NReduce, res.Index); err != nil {
log.Fatal(err)
}
case Reduce:
if err := reducing(reducef, res.NReduce, res.Index); err != nil {
log.Fatal(err)
}
}
commit(args)
}
}
func commit(args *CommitArgs) {
replys := CommitReply{}
call("Master.Commit", &args, &replys)
}
func mapping(mapf func(string, string) []KeyValue, filename string, nReduce, index int) error {
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
return fmt.Errorf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
sort.Sort(ByKey(kva))
var fileBucket = make(map[int]*json.Encoder)
for i := 0; i < nReduce; i++ {
oname := fmt.Sprintf("mr-%d-%d", index, i)
ofile, _ := os.Create(oname)
fileBucket[i] = json.NewEncoder(ofile)
defer ofile.Close()
}
for _, v := range kva {
reduceID := ihash(v.Key) % nReduce
// 写入文件 mr-X-Y 中
enc := fileBucket[reduceID]
err := enc.Encode(&v)
if err != nil {
return err
}
}
return nil
}
// GetMapTask ...
func GetMapTask() (bool, *AssignReply) {
args := AssignArgs{}
replys := AssignReply{}
if !call("Master.Assign", &args, &replys) {
return false, nil
}
return true, &replys
}
func reducing(reducef func(string, []string) string, nReduce, index int) error {
kva := make([]KeyValue, 0)
for i := 0; i < nReduce; i++ {
filename := fmt.Sprintf("mr-%d-%d", i, index)
file, _ := os.Open(filename)
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
sort.Sort(ByKey(kva))
outFilename := fmt.Sprintf("mr-out-%d", index)
ofile, _ := os.Create(outFilename)
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
i = j
}
return nil
}
Makefile
master:
go run mrmaster.go pg-*.txt
partial:
go run mrmaster.go pg-being_ernest.txt
build:
go build -buildmode=plugin ../mrapps/wc.go
worker:
go build -buildmode=plugin ../mrapps/wc.go
go run mrworker.go wc.so
reset:
rm -rf mr-*
rm mrmaster
rm mrworker
rm mrsequential
test:
sh test-mr.sh
代码参考了Lab 1 解决方案记录,在此表示感谢。
测试用例
运行 sh test-mr.sh / make test来查看是否通过了的测试用例。
注:macos中没有timeout命令,运行测试脚本的时候会出错,解决方案可以参考Timeout command on Mac OS X? - Stack Overflow。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
*** Starting wc test.
2020/08/19 22:04:41 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- wc test: PASS
*** Starting indexer test.
2020/08/19 22:04:48 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- indexer test: PASS
*** Starting map parallelism test.
2020/08/19 22:04:51 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2020/08/19 22:04:58 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- reduce parallelism test: PASS
*** Starting crash test.
2020/08/19 22:05:06 rpc.Register: method "Done" has 1 input parameters; needs exactly three
2020/08/19 22:05:17 ../pg-being_ernest.txt map task timeout
--- crash test: PASS
*** PASSED ALL TESTS
当我看到满屏的 PASSED 和最后的 PASSED ALL TESTS,把👴高兴坏了。