6.824: Distributed Systems lab1

Posted by Baron Blog on August 19, 2020

概述

有关lab1的要求以及初始代码可以在 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 中获取。

问题与思路

首先需要理解mapreduce模型以及masterworker之间的关系,这些信息在paper中都有。

从图中可以看出master负责将mapreduce的任务分配给workerworker对指定的文件进行操作后按照格式写入到输出文件中。

1. 从何入手

  1. 查看lab1的要求和提示,明白作业最终需要达成什么目的。
  2. 尝试理解并且运行代码mrsequential.go,知道输出什么形式的数据。
  3. 利用给出的第一条提示:修改mr/master代码,让其发送一条RPC请求给master。

2. 任务状态

为了记录mapreduce的运行阶段,需要给任务分配状态位。

1
2
3
4
5
const (
	TaskInit = iota  // 初始值
	TaskRunning      // 任务运行中
	TaskDone         // 任务已完成
)

3. 流程管理

需要在所有的map任务完成之后才能进行reduce任务分配。所以map任务还未完成,使用Empty参数通知给worker

1
2
3
4
5
6
for _, v := range m.mapTask {
    if v != TaskDone {
        reply.Empty = true
        return nil
    }
}

完整示例

由于该lab还是一个作业,并且有人呼吁大家不要将代码开源放在github上,这里我只贴出部分关键代码作为参考。 若有不明白的地方,可以联系我进行解答,邮箱:baronwithyou@gmail.com。

rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const (
	Map    = "map"
	Reduce = "reduce"
)

type AssignArgs struct {}

type AssignReply struct {
	Empty    bool
	Filename string
	NReduce  int
	Index    int
	Type     string
	Finished bool
}

type CommitArgs struct {
	Filename string
	ID       int
	Type     string
}

type CommitReply struct {}

master.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
const (
	TaskInit = iota
	TaskRunning
	TaskDone
)

type Master struct {
	// Your definitions here.
	files      []string
	nReduce    int
	mapTask    []int
	reduceTask []int
	lock       sync.Mutex
	timeout    time.Duration
	done       bool
}

// Assign dispatch task to specific worker
func (m *Master) Assign(args *AssignArgs, reply *AssignReply) error {
	m.lock.Lock()
	defer m.lock.Unlock()

	ctx, _ := context.WithTimeout(context.Background(), m.timeout)

	if m.assignMapTask(ctx, args, reply) {
		return nil
	}

	for _, v := range m.mapTask {
		if v != TaskDone {
			reply.Empty = true
			return nil
		}
	}

	if m.assignReduceTask(ctx, args, reply) {
		return nil
	}

	for _, v := range m.reduceTask {
		if v != TaskDone {
			reply.Empty = true
			return nil
		}
	}
	m.done = true
	reply.Finished = true

	return nil
}

func (m *Master) assignMapTask(ctx context.Context, args *AssignArgs, reply *AssignReply) bool {
	for id, v := range m.mapTask {
		if v == TaskRunning || v == TaskDone {
			continue
		}

		//fmt.Printf("A worker is requiring a map task\n")

		reply.Filename = m.files[id]
		reply.Index = id
		reply.NReduce = m.nReduce
		reply.Type = Map

		m.mapTask[id] = TaskRunning

		go func(c context.Context) {
			select {
			case <-ctx.Done():
				m.lock.Lock()
				defer m.lock.Unlock()

				if m.mapTask[id] == TaskRunning {
					m.mapTask[id] = TaskInit
					log.Printf("%v map task timeout\n", m.files[id])
				}
			}
		}(ctx)

		return true
	}

	return false
}

func (m *Master) assignReduceTask(ctx context.Context, args *AssignArgs, reply *AssignReply) bool {
	for id, v := range m.reduceTask {
		if v == TaskRunning || v == TaskDone {
			continue
		}

		//fmt.Printf("A worker has require a reduce task\n")
		m.reduceTask[id] = TaskRunning

		reply.Index = id
		reply.NReduce = m.nReduce
		reply.Type = Reduce

		go func(c context.Context) {
			select {
			case <-ctx.Done():
				m.lock.Lock()
				defer m.lock.Unlock()

				if m.reduceTask[id] == TaskRunning {
					m.reduceTask[id] = TaskInit
					log.Printf("%v reduce task timeout\n", id)
				}
			}
		}(ctx)

		return true
	}

	return false
}

// Commit ...
func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {
	m.lock.Lock()
	defer m.lock.Unlock()

	switch args.Type {
	case Map:
		//fmt.Printf("%v has been mapped\n", m.files[args.ID])
		m.mapTask[args.ID] = TaskDone
	case Reduce:
		//fmt.Printf("mr-out-%v has been reduced\n", args.ID)
		m.reduceTask[args.ID] = TaskDone
	}

	return nil
}

worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {

	for {
		ok, res := GetMapTask()
		if !ok || res.Finished {
			break
		}

		// Wait if master is waiting for all files to be mapped
		if res.Empty {
			continue
		}

		args := &CommitArgs{}
		args.Type = res.Type
		args.ID = res.Index

		switch res.Type {
		case Map:
			if err := mapping(mapf, res.Filename, res.NReduce, res.Index); err != nil {
				log.Fatal(err)
			}
		case Reduce:
			if err := reducing(reducef, res.NReduce, res.Index); err != nil {
				log.Fatal(err)
			}
		}
		commit(args)
	}
}

func commit(args *CommitArgs) {
	replys := CommitReply{}

	call("Master.Commit", &args, &replys)
}

func mapping(mapf func(string, string) []KeyValue, filename string, nReduce, index int) error {
	file, err := os.Open(filename)
	if err != nil {
		return fmt.Errorf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		return fmt.Errorf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))

	sort.Sort(ByKey(kva))

	var fileBucket = make(map[int]*json.Encoder)
	for i := 0; i < nReduce; i++ {
		oname := fmt.Sprintf("mr-%d-%d", index, i)
		ofile, _ := os.Create(oname)

		fileBucket[i] = json.NewEncoder(ofile)
		defer ofile.Close()
	}

	for _, v := range kva {
		reduceID := ihash(v.Key) % nReduce

		// 写入文件 mr-X-Y 中
		enc := fileBucket[reduceID]
		err := enc.Encode(&v)
		if err != nil {
			return err
		}
	}

	return nil
}

// GetMapTask ...
func GetMapTask() (bool, *AssignReply) {
	args := AssignArgs{}

	replys := AssignReply{}

	if !call("Master.Assign", &args, &replys) {
		return false, nil
	}

	return true, &replys
}

func reducing(reducef func(string, []string) string, nReduce, index int) error {

	kva := make([]KeyValue, 0)
	for i := 0; i < nReduce; i++ {
		filename := fmt.Sprintf("mr-%d-%d", i, index)

		file, _ := os.Open(filename)

		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}

	sort.Sort(ByKey(kva))

	outFilename := fmt.Sprintf("mr-out-%d", index)
	ofile, _ := os.Create(outFilename)

	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

		i = j
	}

	return nil
}

Makefile

master:
	go run mrmaster.go pg-*.txt

partial:
	go run mrmaster.go pg-being_ernest.txt

build:
	go build -buildmode=plugin ../mrapps/wc.go

worker:
	go build -buildmode=plugin ../mrapps/wc.go
	go run mrworker.go wc.so

reset:
	rm -rf mr-*
	rm mrmaster
	rm mrworker
	rm mrsequential

test:
	sh test-mr.sh

代码参考了Lab 1 解决方案记录,在此表示感谢。

测试用例

运行 sh test-mr.sh / make test来查看是否通过了的测试用例。

注:macos中没有timeout命令,运行测试脚本的时候会出错,解决方案可以参考Timeout command on Mac OS X? - Stack Overflow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
*** Starting wc test.
2020/08/19 22:04:41 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- wc test: PASS
*** Starting indexer test.
2020/08/19 22:04:48 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- indexer test: PASS
*** Starting map parallelism test.
2020/08/19 22:04:51 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2020/08/19 22:04:58 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- reduce parallelism test: PASS
*** Starting crash test.
2020/08/19 22:05:06 rpc.Register: method "Done" has 1 input parameters; needs exactly three
2020/08/19 22:05:17 ../pg-being_ernest.txt map task timeout
--- crash test: PASS
*** PASSED ALL TESTS

当我看到满屏的 PASSED 和最后的 PASSED ALL TESTS,把👴高兴坏了。