forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscription.go
375 lines (327 loc) · 10.7 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"container/list"
"context"
crand "crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"math/rand"
"reflect"
"strings"
"sync"
"time"
)
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
)
var globalGen = randomIDGenerator()
// ID defines a pseudo random number that is used to identify RPC subscriptions.
type ID string
// NewID returns a new, random ID.
func NewID() ID {
return globalGen()
}
// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() ID {
var buf = make([]byte, 8)
var seed int64
if _, err := crand.Read(buf); err == nil {
seed = int64(binary.BigEndian.Uint64(buf))
} else {
seed = int64(time.Now().Nanosecond())
}
var (
mu sync.Mutex
rng = rand.New(rand.NewSource(seed))
)
return func() ID {
mu.Lock()
defer mu.Unlock()
id := make([]byte, 16)
rng.Read(id)
return encodeID(id)
}
}
func encodeID(b []byte) ID {
id := hex.EncodeToString(b)
id = strings.TrimLeft(id, "0")
if id == "" {
id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
}
return ID("0x" + id)
}
type notifierKey struct{}
// NotifierFromContext returns the Notifier value stored in ctx, if any.
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
n, ok := ctx.Value(notifierKey{}).(*Notifier)
return n, ok
}
// Notifier is tied to a RPC connection that supports subscriptions.
// Server callbacks use the notifier to send notifications.
type Notifier struct {
h *handler
namespace string
mu sync.Mutex
sub *Subscription
buffer []json.RawMessage
callReturned bool
activated bool
}
// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
if n.sub != nil {
panic("can't create multiple subscriptions with Notifier")
} else if n.callReturned {
panic("can't create subscription after subscribe call has returned")
}
n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
return n.sub
}
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
enc, err := json.Marshal(data)
if err != nil {
return err
}
n.mu.Lock()
defer n.mu.Unlock()
if n.sub == nil {
panic("can't Notify before subscription is created")
} else if n.sub.ID != id {
panic("Notify with wrong ID")
}
if n.activated {
return n.send(n.sub, enc)
}
n.buffer = append(n.buffer, enc)
return nil
}
// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
return n.h.conn.closed()
}
// takeSubscription returns the subscription (if one has been created). No subscription can
// be created after this call.
func (n *Notifier) takeSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
n.callReturned = true
return n.sub
}
// activate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
n.mu.Lock()
defer n.mu.Unlock()
for _, data := range n.buffer {
if err := n.send(n.sub, data); err != nil {
return err
}
}
n.activated = true
return nil
}
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: params,
})
}
// A Subscription is created by a notifier and tied to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID ID
namespace string
err chan error // closed on unsubscribe
}
// Err returns a channel that is closed when the client send an unsubscribe request.
func (s *Subscription) Err() <-chan error {
return s.err
}
// MarshalJSON marshals a subscription as its ID.
func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ID)
}
// ClientSubscription is a subscription established through the Client's Subscribe or
// EthSubscribe methods.
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
namespace string
subid string
// The in channel receives notification values from client dispatcher.
in chan json.RawMessage
// The error channel receives the error from the forwarding loop.
// It is closed by Unsubscribe.
err chan error
errOnce sync.Once
// Closing of the subscription is requested by sending on 'quit'. This is handled by
// the forwarding loop, which closes 'forwardDone' when it has stopped sending to
// sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side.
quit chan error
forwardDone chan struct{}
unsubDone chan struct{}
}
// This is the sentinel value sent on sub.quit when Unsubscribe is called.
var errUnsubscribed = errors.New("unsubscribed")
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
in: make(chan json.RawMessage),
quit: make(chan error),
forwardDone: make(chan struct{}),
unsubDone: make(chan struct{}),
err: make(chan error, 1),
}
return sub
}
// Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due to an error. The
// received error is nil if Close has been called on the underlying client and no other
// error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error {
return sub.err
}
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
sub.errOnce.Do(func() {
select {
case sub.quit <- errUnsubscribed:
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
})
}
// deliver is called by the client's message dispatcher to send a notification value.
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.forwardDone:
return false
}
}
// close is called by the client's message dispatcher when the connection is closed.
func (sub *ClientSubscription) close(err error) {
select {
case sub.quit <- err:
case <-sub.forwardDone:
}
}
// run is the forwarding loop of the subscription. It runs in its own goroutine and
// is launched by the client's handler after the subscription has been created.
func (sub *ClientSubscription) run() {
defer close(sub.unsubDone)
unsubscribe, err := sub.forward()
// The client's dispatch loop won't be able to execute the unsubscribe call if it is
// blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them.
close(sub.forwardDone)
// Call the unsubscribe method on the server.
if unsubscribe {
sub.requestUnsubscribe()
}
// Send the error.
if err != nil {
if err == ErrClientQuit {
// ErrClientQuit gets here when Client.Close is called. This is reported as a
// nil error because it's not an error, but we can't close sub.err here.
err = nil
}
sub.err <- err
}
}
// forward is the forwarding loop. It takes in RPC notifications and sends them
// on the subscription channel.
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
buffer := list.New()
for {
var chosen int
var recv reflect.Value
if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
switch chosen {
case 0: // <-sub.quit
if !recv.IsNil() {
err = recv.Interface().(error)
}
if err == errUnsubscribed {
// Exiting because Unsubscribe was called, unsubscribe on server.
return true, nil
}
return false, err
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
return true, err
}
if buffer.Len() == maxClientSubscriptionBuffer {
return true, ErrSubscriptionQueueOverflow
}
buffer.PushBack(val)
case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front())
}
}
}
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface())
return val.Elem().Interface(), err
}
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}