diff --git a/p2p/discovery.go b/p2p/discovery.go index 271f53e..fda5892 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -94,8 +94,8 @@ func (pd *PeerDiscovery) addPeer(pinfo peer.AddrInfo) { pd.knownPeers[pinfo.ID] = oldPinfo } -// GetPeers returns all known peers -func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo { +// GetKnownPeers returns all known peers +func (pd *PeerDiscovery) GetKnownPeers() []peer.AddrInfo { pd.mu.RLock() defer pd.mu.RUnlock() @@ -106,6 +106,39 @@ func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo { return peers } +// GetConnectedPeers returns all currently connected peers +func (pd *PeerDiscovery) GetConnectedPeers() []peer.AddrInfo { + conns := pd.host.Network().Conns() + peerMap := make(map[peer.ID]peer.AddrInfo) + + for _, conn := range conns { + remotePeer := conn.RemotePeer() + remoteAddr := conn.RemoteMultiaddr() + + if peerInfo, exists := peerMap[remotePeer]; exists { + // peer already in map, add the new address if it's not already there + if !multiaddr.Contains(peerInfo.Addrs, remoteAddr) { + peerInfo.Addrs = append(peerInfo.Addrs, remoteAddr) + peerMap[remotePeer] = peerInfo + } + } else { + // new peer, add to map + peerMap[remotePeer] = peer.AddrInfo{ + ID: remotePeer, + Addrs: []multiaddr.Multiaddr{remoteAddr}, + } + } + } + + // flatten map + peers := make([]peer.AddrInfo, 0, len(peerMap)) + for _, peerInfo := range peerMap { + peers = append(peers, peerInfo) + } + + return peers +} + // handleDiscovery handles incoming discovery streams func (pd *PeerDiscovery) handleDiscovery(s network.Stream) { pd.logger.Debug(). @@ -121,8 +154,9 @@ func (pd *PeerDiscovery) handleDiscovery(s network.Stream) { } pd.addPeer(ai) - // Share our known peers - peers := pd.GetPeers() + // Share our connected peers + // we shouldn't share peers that are not actually connectable + peers := pd.GetConnectedPeers() data, err := json.Marshal(peers) if err != nil { pd.logger.Error().Err(err).Msgf("Failed to marshal peers") @@ -157,7 +191,7 @@ func (pd *PeerDiscovery) startGossip(ctx context.Context) { func (pd *PeerDiscovery) gossipPeers(ctx context.Context) { pd.logger.Debug().Msgf("Gossiping known peers") - peers := pd.GetPeers() + peers := pd.GetKnownPeers() pd.logger.Debug(). Array("peers", zerolog.Arr().Interface(peers)). Msgf("current peers") @@ -178,16 +212,20 @@ func (pd *PeerDiscovery) gossipPeers(ctx context.Context) { defer wg.Done() defer func() { <-sem }() - err := pd.host.Connect(ctx, p) - if err != nil { - pd.logger.Error().Err(err). - Stringer("to", p.ID). - Msg("Failed to connect to peer") - return + // only connect to peer if no active connections + // to prevent interruptions if address is changing + if len(pd.host.Network().ConnsToPeer(p.ID)) == 0 { + err := pd.host.Connect(ctx, p) + if err != nil { + pd.logger.Error().Err(err). + Stringer("to", p.ID). + Msg("Failed to connect to peer") + return + } + pd.logger.Debug(). + Stringer("to", p). + Msg("Connected to peer") } - pd.logger.Debug(). - Stringer("to", p). - Msg("Connected to peer") // Open discovery stream s, err := pd.host.NewStream(ctx, p.ID, DiscoveryProtocol)