Skip to content

Commit

Permalink
also add a retry on getPeersGroupWithRetry
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Sep 16, 2024
1 parent 77fc258 commit 838e330
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
2 changes: 2 additions & 0 deletions p2p/communication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (CommunicationTestSuite) TestBasicCommunication(c *C) {
comm.SetSubscribe(messages.TSSKeyGenMsg, "hello", make(chan *Message))
c.Assert(comm.getSubscriber(messages.TSSKeySignMsg, "hello"), IsNil)
c.Assert(comm.getSubscriber(messages.TSSKeyGenMsg, "hello"), NotNil)
c.Assert(comm.getSubscriberWithRetry(messages.TSSKeySignMsg, "hello"), IsNil)
c.Assert(comm.getSubscriberWithRetry(messages.TSSKeyGenMsg, "hello"), NotNil)
comm.CancelSubscribe(messages.TSSKeyGenMsg, "hello")
comm.CancelSubscribe(messages.TSSKeyGenMsg, "whatever")
comm.CancelSubscribe(messages.TSSKeySignMsg, "asdsdf")
Expand Down
35 changes: 23 additions & 12 deletions p2p/party_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,27 @@ func (pc *PartyCoordinator) Stop() {
close(pc.stopChan)
}

// getPeersGroupWithRetry gets the peer group to reduce failures when desynchronized
func (pc *PartyCoordinator) getPeersGroupWithRetry(id string) *peerStatus {
var peerGroup *peerStatus
var ok bool
for i := 0; i < 3; i++ {
pc.joinPartyGroupLock.Lock()
peerGroup, ok = pc.peersGroup[id]
pc.joinPartyGroupLock.Unlock()
if ok {
return peerGroup
}
time.Sleep(time.Second * 50)
}
return peerGroup
}

func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm, stream network.Stream) {
remotePeer := stream.Conn().RemotePeer()
pc.joinPartyGroupLock.Lock()
peerGroup, ok := pc.peersGroup[respMsg.ID]
pc.joinPartyGroupLock.Unlock()
if !ok {

peerGroup := pc.getPeersGroupWithRetry(respMsg.ID)
if peerGroup == nil {
pc.logger.Info().Msgf("message ID from peer(%s) can not be found", remotePeer)
_ = stream.Reset()
return
Expand All @@ -91,10 +106,8 @@ func (pc *PartyCoordinator) processRespMsg(respMsg *messages.JoinPartyLeaderComm

func (pc *PartyCoordinator) processReqMsg(requestMsg *messages.JoinPartyLeaderComm, stream network.Stream) {
pc.streamMgr.AddStream(requestMsg.ID, stream)
pc.joinPartyGroupLock.Lock()
peerGroup, ok := pc.peersGroup[requestMsg.ID]
pc.joinPartyGroupLock.Unlock()
if !ok {
peerGroup := pc.getPeersGroupWithRetry(requestMsg.ID)
if peerGroup == nil {
_ = stream.Reset()
pc.logger.Info().Msg("this party is not ready")
return
Expand Down Expand Up @@ -127,10 +140,8 @@ func (pc *PartyCoordinator) HandleStream(stream network.Stream) {
return
}
pc.streamMgr.AddStream(msg.ID, stream)
pc.joinPartyGroupLock.Lock()
peerGroup, ok := pc.peersGroup[msg.ID]
pc.joinPartyGroupLock.Unlock()
if !ok {
peerGroup := pc.getPeersGroupWithRetry(msg.ID)
if peerGroup == nil {
_ = stream.Reset()
pc.logger.Info().Msg("this party is not ready")
return
Expand Down

0 comments on commit 838e330

Please sign in to comment.