-
Notifications
You must be signed in to change notification settings - Fork 107
/
surrogate_gossiper.go
99 lines (85 loc) · 2.66 KB
/
surrogate_gossiper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package mesh
import (
"bytes"
"hash/fnv"
"sync"
"time"
)
// surrogateGossiper ignores unicasts and relays broadcasts and gossips.
type surrogateGossiper struct {
sync.Mutex
prevUpdates []prevUpdate
router *Router
}
type prevUpdate struct {
update []byte
hash uint64
t time.Time
}
var _ Gossiper = &surrogateGossiper{}
// Hook to mock time for testing
var now = func() time.Time { return time.Now() }
// OnGossipUnicast implements Gossiper.
func (*surrogateGossiper) OnGossipUnicast(sender PeerName, msg []byte) error {
return nil
}
// OnGossipBroadcast implements Gossiper.
func (*surrogateGossiper) OnGossipBroadcast(_ PeerName, update []byte) (GossipData, error) {
return newSurrogateGossipData(update), nil
}
// Gossip implements Gossiper.
func (*surrogateGossiper) Gossip() GossipData {
return nil
}
// OnGossip should return "everything new I've just learnt".
// surrogateGossiper doesn't understand the content of messages, but it can eliminate simple duplicates
func (s *surrogateGossiper) OnGossip(update []byte) (GossipData, error) {
hash := fnv.New64a()
_, _ = hash.Write(update)
updateHash := hash.Sum64()
s.Lock()
defer s.Unlock()
for _, p := range s.prevUpdates {
if updateHash == p.hash && bytes.Equal(update, p.update) {
return nil, nil
}
}
// Delete anything that's older than the gossip interval, so we don't grow forever
// (this time limit is arbitrary; surrogateGossiper should pass on new gossip immediately
// so there should be no reason for a duplicate to show up after a long time)
updateTime := now()
gossipInterval := defaultGossipInterval
if s.router != nil {
gossipInterval = s.router.gossipInterval()
}
deleteBefore := updateTime.Add(-gossipInterval)
keepFrom := len(s.prevUpdates)
for i, p := range s.prevUpdates {
if p.t.After(deleteBefore) {
keepFrom = i
break
}
}
s.prevUpdates = append(s.prevUpdates[keepFrom:], prevUpdate{update, updateHash, updateTime})
return newSurrogateGossipData(update), nil
}
// surrogateGossipData is a simple in-memory GossipData.
type surrogateGossipData struct {
messages [][]byte
}
var _ GossipData = &surrogateGossipData{}
func newSurrogateGossipData(msg []byte) *surrogateGossipData {
return &surrogateGossipData{messages: [][]byte{msg}}
}
// Encode implements GossipData.
func (d *surrogateGossipData) Encode() [][]byte {
return d.messages
}
// Merge implements GossipData.
func (d *surrogateGossipData) Merge(other GossipData) GossipData {
o := other.(*surrogateGossipData)
messages := make([][]byte, 0, len(d.messages)+len(o.messages))
messages = append(messages, d.messages...)
messages = append(messages, o.messages...)
return &surrogateGossipData{messages: messages}
}