-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
worker_stack.go
81 lines (67 loc) · 1.43 KB
/
worker_stack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package ants
import "time"
type workerStack struct {
items []worker
expiry []worker
}
func newWorkerStack(size int) *workerStack {
return &workerStack{
items: make([]worker, 0, size),
}
}
func (wq *workerStack) len() int {
return len(wq.items)
}
func (wq *workerStack) isEmpty() bool {
return len(wq.items) == 0
}
func (wq *workerStack) insert(w worker) error {
wq.items = append(wq.items, w)
return nil
}
func (wq *workerStack) detach() worker {
l := wq.len()
if l == 0 {
return nil
}
w := wq.items[l-1]
wq.items[l-1] = nil // avoid memory leaks
wq.items = wq.items[:l-1]
return w
}
func (wq *workerStack) refresh(duration time.Duration) []worker {
n := wq.len()
if n == 0 {
return nil
}
expiryTime := time.Now().Add(-duration)
index := wq.binarySearch(0, n-1, expiryTime)
wq.expiry = wq.expiry[:0]
if index != -1 {
wq.expiry = append(wq.expiry, wq.items[:index+1]...)
m := copy(wq.items, wq.items[index+1:])
for i := m; i < n; i++ {
wq.items[i] = nil
}
wq.items = wq.items[:m]
}
return wq.expiry
}
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
for l <= r {
mid := l + ((r - l) >> 1) // avoid overflow when computing mid
if expiryTime.Before(wq.items[mid].lastUsedTime()) {
r = mid - 1
} else {
l = mid + 1
}
}
return r
}
func (wq *workerStack) reset() {
for i := 0; i < wq.len(); i++ {
wq.items[i].finish()
wq.items[i] = nil
}
wq.items = wq.items[:0]
}