forked from go-zookeeper/zk
-
Notifications
You must be signed in to change notification settings - Fork 6
/
lock.go
194 lines (173 loc) · 4.48 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package zk
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
)
var (
// ErrDeadlock is returned by Lock when trying to lock twice without unlocking first
ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
// ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired.
ErrNotLocked = errors.New("zk: not locked")
)
// Lock is a mutual exclusion lock.
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
// NewLock creates a new lock instance using the provided connection, path, and acl.
// The path must be a node that is only used by this lock. A lock instances starts
// unlocked until Lock() is called.
func NewLock(c *Conn, path string, acl []ACL) *Lock {
return &Lock{
c: c,
path: path,
acl: acl,
}
}
func parseSeq(path string) (int, error) {
parts := strings.Split(path, "lock-")
// python client uses a __LOCK__ prefix
if len(parts) == 1 {
parts = strings.Split(path, "__")
}
return strconv.Atoi(parts[len(parts)-1])
}
// Lock attempts to acquire the lock. It works like LockWithData, but it doesn't
// write any data to the lock node.
func (l *Lock) Lock() error {
return l.LockWithDataCtx(context.Background(), []byte{})
}
// LockCtx attempts to acquire the lock. It works like LockWithData, but it doesn't
// write any data to the lock node.
func (l *Lock) LockCtx(ctx context.Context) error {
return l.LockWithDataCtx(ctx, []byte{})
}
// LockWithData attempts to acquire the lock, writing data into the lock node.
// It will wait to return until the lock is acquired or an error occurs. If
// this instance already has the lock then ErrDeadlock is returned.
func (l *Lock) LockWithData(data []byte) error {
return l.LockWithDataCtx(context.Background(), data)
}
// LockWithDataCtx attempts to acquire the lock, writing data into the lock node.
// It will wait to return until the lock is acquired or an error occurs. If
// this instance already has the lock then ErrDeadlock is returned.
func (l *Lock) LockWithDataCtx(ctx context.Context, data []byte) error {
if l.lockPath != "" {
return ErrDeadlock
}
prefix := fmt.Sprintf("%s/lock-", l.path)
path := ""
var err error
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequentialCtx(ctx, prefix, data, l.acl)
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
for _, p := range parts[1:] {
var exists bool
pth += "/" + p
exists, _, err = l.c.ExistsCtx(ctx, pth)
if err != nil {
return err
}
if exists {
continue
}
_, err = l.c.CreateCtx(ctx, pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return err
}
}
} else if err == nil {
break
} else {
return err
}
}
if err != nil {
return err
}
seq, err := parseSeq(path)
if err != nil {
return err
}
acquired := false
defer func() {
if !acquired {
// Cleanup our ephemeral node, if we return before acquiring the lock.
// Otherwise, we risk leaking the lock and blocking other participants.
_ = l.c.Delete(path, -1)
}
}()
for {
children, _, err := l.c.ChildrenCtx(ctx, l.path)
if err != nil {
return err
}
lowestSeq := seq
prevSeq := -1
prevSeqPath := ""
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return err
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
prevSeqPath = p
}
}
if seq == lowestSeq {
// Acquired the lock
acquired = true
break
}
// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetWCtx(ctx, l.path+"/"+prevSeqPath)
if err != nil && err != ErrNoNode {
return err
} else if err != nil && err == ErrNoNode {
// try again
continue
}
select {
case ev := <-ch:
if ev.Err != nil {
return ev.Err
}
case <-ctx.Done():
return ctx.Err()
}
}
l.seq = seq
l.lockPath = path
return nil
}
// Unlock releases an acquired lock. If the lock is not currently acquired by
// this Lock instance than ErrNotLocked is returned.
func (l *Lock) Unlock() error {
return l.UnlockCtx(context.Background())
}
// UnlockCtx releases an acquired lock. If the lock is not currently acquired by
// this Lock instance than ErrNotLocked is returned.
func (l *Lock) UnlockCtx(ctx context.Context) error {
if l.lockPath == "" {
return ErrNotLocked
}
if err := l.c.DeleteCtx(ctx, l.lockPath, -1); err != nil {
return err
}
l.lockPath = ""
l.seq = 0
return nil
}