Skip to content

Commit

Permalink
server/gossip: reconnect to dropped peers
Browse files Browse the repository at this point in the history
  • Loading branch information
zllovesuki committed Feb 4, 2022
1 parent 16b7490 commit a8caae9
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
83 changes: 57 additions & 26 deletions server/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,62 @@ func (s *Server) Gossip() error {
s.gossip.Shutdown()
}()

go func() {
s.logger.Info("gossip: reconcilate peers every 1 minute")

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-s.parentCtx.Done():
return
case <-ticker.C:
s.peerReconciliation()
}
}
}()

return nil
}

func (s *Server) shouldReachOut(m Meta) bool {
logger := s.logger.With(zap.Object("meta", m))

if s.meta.RespondOnly {
logger.Info("gossip: peer is configured to respond to connections only, acting as responder")
return false
}

if !m.RespondOnly && m.PeerID < s.PeerID() {
logger.Info("gossip: peer has a lower PeerID, acting as responder")
return false
}

return true
}

func (s *Server) peerReconciliation() {
var m Meta
knownNodes := s.gossip.Members()

for _, node := range knownNodes {
if node.State != memberlist.StateAlive {
continue
}
if err := m.UnmarshalBinary(node.Meta); err != nil {
continue
}
if m.PeerID == s.PeerID() {
continue
}
if s.peers.Get(m.PeerID) == nil && s.shouldReachOut(m) {
s.logger.Info("gossip: dropped peer detected, reconnecting", zap.Object("meta", m))
go s.connectPeer(m)
}
}
}

// ======== Membership Changes ========

var _ memberlist.EventDelegate = &Server{}
Expand All @@ -53,43 +106,21 @@ func (s *Server) NotifyJoin(node *memberlist.Node) {
s.logger.Debug("gossip: ignore new node join on ourself")
return
}
if node.Meta == nil {
return
}

var m Meta
if err := m.UnmarshalBinary(node.Meta); err != nil {
return
}

logger := s.logger.With(zap.Object("meta", m))
s.logger.Info("gossip: new peer discovered via gossip", zap.Object("meta", m))

if m.PeerID == s.PeerID() {
logger.Fatal("gossip: new peer has the same ID as current node", zap.Uint64("self", s.PeerID()))
s.logger.Fatal("gossip: new peer has the same ID as current node", zap.Uint64("self", s.PeerID()))
}

logger.Info("gossip: new peer discovered via gossip")

if s.meta.RespondOnly {
logger.Info("gossip: current peer is configured to respond connections only, acting as responder")
return
if s.shouldReachOut(m) {
go s.connectPeer(m)
}

if !m.RespondOnly && m.PeerID < s.PeerID() {
logger.Info("gossip: new peer has a lower PeerID, acting as responder")
go func(m Meta) {
time.Sleep(time.Second * 10)

if s.peers.Get(m.PeerID) != nil {
return
}
logger.Warn("gossip: new peer not connected after 10 seconds, attempt to reach out")
go s.connectPeer(m)
}(m)
return
}

go s.connectPeer(m)
}

func (s *Server) NotifyLeave(node *memberlist.Node) {
Expand Down
2 changes: 1 addition & 1 deletion server/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *Server) handlePeerEvents() {
// but only one side of the session can
go func(p multiplexer.Peer) {
if p.Initiator() {
s.logger.Debug("skip openning message stream as initiator", zap.Uint64("peer", p.Peer()))
s.logger.Debug("skip opening message stream as initiator", zap.Uint64("peer", p.Peer()))
return
}
c, err := p.Messaging(s.parentCtx)
Expand Down

0 comments on commit a8caae9

Please sign in to comment.