forked from go-zookeeper/zk
-
Notifications
You must be signed in to change notification settings - Fork 6
/
watcher.go
157 lines (130 loc) · 4.55 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package zk
import (
"context"
"sync"
"time"
)
const (
defaultReservoirLimit = 2048
)
// WatcherOption represents an option for a watcher.
type WatcherOption func(*watcherOptions)
// WithWatcherInvalidateOnDisconnect returns a WatcherOption that configures the watcher to be invalidated on disconnect.
func WithWatcherInvalidateOnDisconnect() WatcherOption {
return func(opts *watcherOptions) {
opts.invalidateOnDisconnect = true
}
}
// WithWatcherReservoirLimit returns a WatcherOption that configures the reservoir limit for a persistent watcher.
// The reservoir limit is the absolute maximum number of events that can be queued before the watcher is forcefully closed.
// If the given reservoir lLimit is <= 0, the default value of 2048 will be used.
func WithWatcherReservoirLimit(reservoirLimit int) WatcherOption {
return func(opts *watcherOptions) {
if reservoirLimit <= 0 {
reservoirLimit = defaultReservoirLimit
}
opts.reservoirLimit = reservoirLimit
}
}
// WithStallCallback returns a WatcherOption that configures a callback function for when we hit the reservoir limit.
func WithStallCallback(stallCallback func()) WatcherOption {
return func(opts *watcherOptions) {
opts.stallCallback = stallCallback
}
}
type watcherOptions struct {
// If true, the watcher will be invalidated if the connection is lost.
invalidateOnDisconnect bool
// The pump reservoir limit for persistent watchers. Defaults to defaultReservoirLimit.
reservoirLimit int
// Called when the pump reservoir limit is hit and we stop processing Events
stallCallback func()
}
type watcherKey struct {
path string
kind watcherKind
}
type watcher interface {
// options returns the options for the watcher.
options() watcherOptions
// eventChan returns the channel to consume events from.
eventChan() <-chan Event
// notify is called by the connection when an event is received.
// Returns true if the watcher accepted it; false otherwise (ie: watcher is dead).
// The caller should discard the watcher if false is returned.
notify(ev Event) bool
// close closes the watcher.
// Once closed, notify() will no longer accept events, and eventChan() will eventually be closed.
close()
}
func newFireOnceWatcher(opts watcherOptions) *fireOnceWatcher {
return &fireOnceWatcher{
opts: opts,
ch: make(chan Event, 2), // Buffer to hold 1 watch event + 1 close event.
}
}
// fireOnceWatcher is an implementation of watcher that fires a single watch event (ie: for GetW, ExistsW, ChildrenW).
type fireOnceWatcher struct {
opts watcherOptions
ch chan Event
closeOnce sync.Once
}
func (w *fireOnceWatcher) eventChan() <-chan Event {
return w.ch
}
func (w *fireOnceWatcher) notify(ev Event) (ok bool) {
// This is a bit ugly, but it's not impossible for a watcher to be notified after it's been closed.
// It's a compromise that allows us to have finer-grained locking in the connection's receive loop.
// It's also not worth synchronizing notify() and close(), since this is a very rare case.
defer func() {
_ = recover() // Ignore panics from closed channel.
}()
w.ch <- ev
return true
}
func (w *fireOnceWatcher) options() watcherOptions {
return w.opts
}
func (w *fireOnceWatcher) close() {
w.closeOnce.Do(func() {
close(w.ch)
})
}
func newPersistentWatcher(opts watcherOptions) *persistentWatcher {
if opts.reservoirLimit == 0 {
opts.reservoirLimit = defaultReservoirLimit
}
return &persistentWatcher{
opts: opts,
pump: newPump[Event](uint32(opts.reservoirLimit), opts.stallCallback),
}
}
// persistentWatcher is an implementation of watcher for persistent watches.
type persistentWatcher struct {
opts watcherOptions
pump *pump[Event]
}
func (w *persistentWatcher) eventChan() <-chan Event {
return w.pump.outChan()
}
func (w *persistentWatcher) notify(ev Event) bool {
return w.pump.give(context.Background(), ev)
}
func (w *persistentWatcher) options() watcherOptions {
return w.opts
}
func (w *persistentWatcher) close() {
// Closing input will allow the pump to drain and stop naturally, as long as output is consumed.
w.pump.closeInput() // Idempotent.
// If output is not consumed, then the pump may not be able to stop, causing a goroutine leak.
// To protect against this, we'll wait up to 5 minutes for the pump to stop, after which we force it.
if !w.pump.isStopRequested() {
go func(p *pump[Event]) { // Monitor the pump in a new goroutine.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if p.waitUntilStopped(ctx) == context.DeadlineExceeded {
p.stop() // Force stop; idempotent.
}
}(w.pump)
}
}