-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.go
132 lines (114 loc) · 2.55 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package client
import (
"runtime"
"sort"
"sync"
"sync/atomic"
)
// ------------------------------------
// Worker pool
// ------------------------------------
type Result[T any] struct {
Idx int64
Val T
Err error
}
type Task[T any] struct {
Idx int64
Fnc func() (T, error)
}
type WorkerPool[T any] struct {
results []*Result[T]
isDone bool
taskCount int64
taskChan chan *Task[T]
resultChan chan *Result[T]
waitTask sync.WaitGroup
waitResult sync.WaitGroup
}
func NewWorkerPool[T any](count int) *WorkerPool[T] {
if count == 0 {
count = runtime.NumCPU()
}
if count < 0 {
return nil
}
pool := &WorkerPool[T]{
taskChan: make(chan *Task[T]),
resultChan: make(chan *Result[T]),
results: make([]*Result[T], 0),
waitTask: sync.WaitGroup{},
}
for i := 0; i < count; i++ {
pool.waitTask.Add(1)
go pool.doTask(i)
}
pool.waitResult.Add(1)
go pool.collectResult()
return pool
}
func (pool *WorkerPool[T]) doTask(wn int) {
for task := range pool.taskChan {
value, err := task.Fnc()
pool.resultChan <- &Result[T]{task.Idx, value, err}
}
pool.waitTask.Done()
}
func (pool *WorkerPool[T]) collectResult() {
for res := range pool.resultChan {
pool.results = append(pool.results, res)
}
sort.Slice(pool.results, func(i, j int) bool {
return pool.results[i].Idx < pool.results[j].Idx
})
pool.isDone = true
pool.waitResult.Done()
}
// Submit a new task for execution
func (pool *WorkerPool[T]) Submit(f func() (T, error)) {
task := &Task[T]{
Idx: atomic.AddInt64(&pool.taskCount, 1),
Fnc: f,
}
pool.taskChan <- task
}
// Close the worker pool
// Tasks can no longer be submitted
// This function must be explicitly called before Wait
func (pool *WorkerPool[T]) Close() {
close(pool.taskChan)
}
// Wait blocks until all tasks are finished and the results are collected
func (pool *WorkerPool[T]) Wait() []*Result[T] {
pool.waitTask.Wait()
close(pool.resultChan)
pool.waitResult.Wait()
return pool.results
}
// Results prevents accessing to the underlying result array while it still not completed
func (pool WorkerPool[T]) Results() []*Result[T] {
if pool.isDone {
return pool.results
}
return nil
}
func (pool WorkerPool[T]) ResultValues() []T {
if pool.isDone {
values := make([]T, len(pool.results))
for i, res := range pool.results {
values[i] = res.Val
}
return values
}
return nil
}
func (pool WorkerPool[T]) ResultErrors() []error {
if pool.isDone {
values := make([]error, len(pool.results))
for i, res := range pool.results {
values[i] = res.Err
}
return values
}
return nil
}