forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 2
/
notify.go
97 lines (78 loc) · 1.95 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package pubsub
import (
"fmt"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
)
type PubSubNotif PubSub
func (p *PubSubNotif) startMonitoring() error {
sub, err := p.host.EventBus().Subscribe([]interface{}{
new(event.EvtPeerConnectednessChanged),
new(event.EvtPeerProtocolsUpdated),
}, eventbus.Name("pubsub/peers-notify")) // @NOTE(gfanton): is the naming is correct ?
if err != nil {
return fmt.Errorf("unable to subscribe to EventBus: %w", err)
}
// add current peers
p.addPeers(p.host.Network().Peers()...)
go func() {
defer sub.Close()
for {
var e interface{}
select {
case <-p.ctx.Done():
return
case e = <-sub.Out():
}
switch evt := e.(type) {
case event.EvtPeerConnectednessChanged:
// send record to connected peer only
if evt.Connectedness == network.Connected {
go p.addPeers(evt.Peer)
}
case event.EvtPeerProtocolsUpdated:
supportedProtocols := p.rt.Protocols()
protocol_loop:
for _, addedProtocol := range evt.Added {
for _, wantedProtocol := range supportedProtocols {
if wantedProtocol == addedProtocol {
go p.addPeers(evt.Peer)
break protocol_loop
}
}
}
}
}
}()
return nil
}
func (p *PubSubNotif) isTransient(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
}
return true
}
func (p *PubSubNotif) addPeers(peers ...peer.ID) {
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
for _, pid := range peers {
if p.host.Network().Connectedness(pid) != network.Connected || p.isTransient(pid) {
continue
}
p.newPeersPend[pid] = struct{}{}
}
// do we need to update ?
haveNewPeer := len(p.newPeersPend) > 0
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()
if haveNewPeer {
select {
case p.newPeers <- struct{}{}:
default:
}
}
}