-
Notifications
You must be signed in to change notification settings - Fork 45
/
stoppable_workers.go
128 lines (114 loc) · 4.09 KB
/
stoppable_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package utils
import (
"context"
"sync"
"time"
)
// StoppableWorkers is a collection of goroutines that can be stopped at a
// later time.
type StoppableWorkers struct {
// Use a `sync.RWMutex` instead of a `sync.Mutex` so that additions of new
// workers do not lock with each other in any way. We want
// as-fast-as-possible worker addition.
mu sync.RWMutex
ctx context.Context
cancelFunc func()
workers sync.WaitGroup
}
// NewStoppableWorkers creates a new StoppableWorkers instance. The instance's
// context will be derived from passed in context.
func NewStoppableWorkers(ctx context.Context) *StoppableWorkers {
ctx, cancelFunc := context.WithCancel(ctx)
return &StoppableWorkers{ctx: ctx, cancelFunc: cancelFunc}
}
// NewBackgroundStoppableWorkers creates a new StoppableWorkers instance. The
// instance's context will be derived from `context.Background()`. The passed
// in workers will be `Add`ed. Workers:
//
// - MUST respond appropriately to errors on the context parameter.
// - MUST NOT add more workers to the `StoppableWorkers` group to which
// they belong.
//
// Any `panic`s from workers will be `recover`ed and logged.
func NewBackgroundStoppableWorkers(workers ...func(context.Context)) *StoppableWorkers {
ctx, cancelFunc := context.WithCancel(context.Background())
sw := &StoppableWorkers{ctx: ctx, cancelFunc: cancelFunc}
for _, worker := range workers {
sw.Add(worker)
}
return sw
}
// NewStoppableWorkerWithTicker creates a `StoppableWorkers` object with a single worker that gets
// called every `tickRate`. Calls to the input `worker` function are serialized. I.e: a slow "work"
// iteration will just slow down when the next one is called.
func NewStoppableWorkerWithTicker(tickRate time.Duration, workFn func(context.Context)) *StoppableWorkers {
ctx, cancelFunc := context.WithCancel(context.Background())
sw := &StoppableWorkers{ctx: ctx, cancelFunc: cancelFunc}
sw.workers.Add(1)
PanicCapturingGo(func() {
defer sw.workers.Done()
timer := time.NewTicker(tickRate)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
default:
}
select {
case <-timer.C:
workFn(ctx)
case <-ctx.Done():
return
}
}
})
return sw
}
// Add starts up a goroutine for the passed-in function. Workers must respond appropriately to
// errors on the context parameter.
//
// The worker will not be added if the StoppableWorkers instance has already
// been stopped. Any `panic`s from workers will be `recover`ed and logged.
func (sw *StoppableWorkers) Add(worker func(context.Context)) {
// Acquire the read lock to allow concurrent worker addition. The Stop method will
// write-lock. `Add` is guaranteed to either:
// - Observe the context is canceled -- the worker will not be run, nor will the `workers`
// WaitGroup be incremented
// - Observe the context is not canceled atomically with incrementing the `workers`
// WaitGroup. `Stop` is guaranteed to wait for this new worker to complete before returning.
sw.mu.RLock()
if sw.ctx.Err() != nil {
sw.mu.RUnlock()
return
}
sw.workers.Add(1)
sw.mu.RUnlock()
PanicCapturingGo(func() {
defer sw.workers.Done()
worker(sw.ctx)
})
}
// Stop idempotently shuts down all the goroutines we started up.
func (sw *StoppableWorkers) Stop() {
// Call `cancelFunc` with the write lock that competes with "readers" that can add workers. This
// guarantees `Add` worker calls that start a goroutine have incremented the `workers` WaitGroup
// prior to `Stop` calling `Wait`.
sw.mu.Lock()
if sw.ctx.Err() != nil {
sw.mu.Unlock()
return
}
sw.cancelFunc()
// Make sure to unlock the mutex before waiting for background goroutines to shut down! That
// way, any goroutine that was waiting on this lock (e.g., it was trying to spawn another
// background worker) won't deadlock, and we'll shut down properly.
sw.mu.Unlock()
sw.workers.Wait()
}
// Context gets the context of the StoppableWorkers instance. Using this
// function is expected to be rare: usually you shouldn't need to interact with
// the context directly.
func (sw *StoppableWorkers) Context() context.Context {
return sw.ctx
}