From 838e3309c8f0465d5714984f7ddf0032aaac13d1 Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Mon, 16 Sep 2024 13:18:34 -0700 Subject: [PATCH] also add a retry on getPeersGroupWithRetry --- p2p/communication_test.go | 2 ++ p2p/party_coordinator.go | 35 +++++++++++++++++++++++------------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/p2p/communication_test.go b/p2p/communication_test.go index 9d77c40..61c40a4 100644 --- a/p2p/communication_test.go +++ b/p2p/communication_test.go @@ -24,6 +24,8 @@ func (CommunicationTestSuite) TestBasicCommunication(c *C) { comm.SetSubscribe(messages.TSSKeyGenMsg, "hello", make(chan *Message)) c.Assert(comm.getSubscriber(messages.TSSKeySignMsg, "hello"), IsNil) c.Assert(comm.getSubscriber(messages.TSSKeyGenMsg, "hello"), NotNil) + c.Assert(comm.getSubscriberWithRetry(messages.TSSKeySignMsg, "hello"), IsNil) + c.Assert(comm.getSubscriberWithRetry(messages.TSSKeyGenMsg, "hello"), NotNil) comm.CancelSubscribe(messages.TSSKeyGenMsg, "hello") comm.CancelSubscribe(messages.TSSKeyGenMsg, "whatever") comm.CancelSubscribe(messages.TSSKeySignMsg, "asdsdf") diff --git a/p2p/party_coordinator.go b/p2p/party_coordinator.go index 414d6a8..889b85b 100644 --- a/p2p/party_coordinator.go +++ b/p2p/party_coordinator.go @@ -65,12 +65,27 @@ func (pc *PartyCoordinator) Stop() { close(pc.stopChan) } +// getPeersGroupWithRetry gets the peer group to reduce failures when desynchronized +func (pc *PartyCoordinator) getPeersGroupWithRetry(id string) *peerStatus { + var peerGroup *peerStatus + var ok bool + for i := 0; i < 3; i++ { + pc.joinPartyGroupLock.Lock() + peerGroup, ok = pc.peersGroup[id] + pc.joinPartyGroupLock.Unlock() + if ok { + return peerGroup + } + time.Sleep(time.Second * 50) + } + return peerGroup +} + func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm, stream network.Stream) { remotePeer := stream.Conn().RemotePeer() - pc.joinPartyGroupLock.Lock() - peerGroup, ok := pc.peersGroup[respMsg.ID] - pc.joinPartyGroupLock.Unlock() - if !ok { + + peerGroup := pc.getPeersGroupWithRetry(respMsg.ID) + if peerGroup == nil { pc.logger.Info().Msgf("message ID from peer(%s) can not be found", remotePeer) _ = stream.Reset() return @@ -91,10 +106,8 @@ func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm func (pc *PartyCoordinator) processReqMsg(requestMsg *messages.JoinPartyLeaderComm, stream network.Stream) { pc.streamMgr.AddStream(requestMsg.ID, stream) - pc.joinPartyGroupLock.Lock() - peerGroup, ok := pc.peersGroup[requestMsg.ID] - pc.joinPartyGroupLock.Unlock() - if !ok { + peerGroup := pc.getPeersGroupWithRetry(requestMsg.ID) + if peerGroup == nil { _ = stream.Reset() pc.logger.Info().Msg("this party is not ready") return @@ -127,10 +140,8 @@ func (pc *PartyCoordinator) HandleStream(stream network.Stream) { return } pc.streamMgr.AddStream(msg.ID, stream) - pc.joinPartyGroupLock.Lock() - peerGroup, ok := pc.peersGroup[msg.ID] - pc.joinPartyGroupLock.Unlock() - if !ok { + peerGroup := pc.getPeersGroupWithRetry(msg.ID) + if peerGroup == nil { _ = stream.Reset() pc.logger.Info().Msg("this party is not ready") return