Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/chrislusf/raft
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Jun 23, 2022
2 parents 0b95ac8 + 2be29d9 commit 3936262
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
18 changes: 9 additions & 9 deletions grpc_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ var (
// An GrpcTransporter is a default transport layer used to communicate between
// multiple servers.
type GrpcTransporter struct {
grpcDialOption grpc.DialOption
grpcDialOptions []grpc.DialOption
}

// Creates a new HTTP transporter with the given path prefix.
func NewGrpcTransporter(grpcDialOption grpc.DialOption) *GrpcTransporter {
func NewGrpcTransporter(grpcDialOptions ...grpc.DialOption) *GrpcTransporter {
t := &GrpcTransporter{
grpcDialOption: grpcDialOption,
grpcDialOptions: grpcDialOptions,
}
return t
}
Expand All @@ -49,7 +49,7 @@ func NewGrpcServer(server Server) *GrpcServer {
// Sends an AppendEntries RPC to a peer.
func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -83,7 +83,7 @@ func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
// Sends a RequestVote RPC to a peer.
func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -117,7 +117,7 @@ func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
// Sends a SnapshotRequest RPC to a peer.
func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -149,7 +149,7 @@ func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
// Sends a SnapshotRequest RPC to a peer.
func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse) {

err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
defer cancel()

Expand Down Expand Up @@ -285,12 +285,12 @@ func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq *
}, nil
}

func withRaftServerClient(raftServer string, grpcDialOption grpc.DialOption, fn func(protobuf.RaftClient) error) error {
func withRaftServerClient(raftServer string, grpcDialOptions []grpc.DialOption, fn func(protobuf.RaftClient) error) error {

return withCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := protobuf.NewRaftClient(grpcConnection)
return fn(client)
}, raftServer, grpcDialOption)
}, raftServer, grpcDialOptions...)

}

Expand Down
6 changes: 4 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func (s *server) sendAsync(value interface{}) {
}

func (s *server) checkQuorumActive(timeout time.Duration) bool {
s.debugln("check.quorum.active")
s.debugln("check.quorum.active with quorum size: ", s.QuorumSize(), " member count: ", s.MemberCount())
act := 1
now := time.Now()
for _, peer := range s.peers {
Expand Down Expand Up @@ -1267,7 +1267,9 @@ func (s *server) TakeSnapshot() error {
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot.Peers = peers
s.pendingSnapshot.State = state
s.saveSnapshot()
if err := s.saveSnapshot(); err != nil {
return err
}

// We keep some log entries after the snapshot.
// We do not want to send the whole snapshot to the slightly slow machines
Expand Down

0 comments on commit 3936262

Please sign in to comment.