Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the update timing of LastCompactedIdx #441

Open
wants to merge 1 commit into
base: course
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions kv/raftstore/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ type peer struct {
// (Used in 2B)
proposals []*proposal

// Index of last scheduled compacted raft log.
// (Used in 2C)
LastCompactedIdx uint64

// Cache the peers information from other stores
// when sending raft messages to other peers, it's used to get the store id of target peer
// (Used in 3B conf change)
Expand Down
2 changes: 0 additions & 2 deletions kv/raftstore/peer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,8 @@ func (d *peerMsgHandler) ScheduleCompactLog(truncatedIndex uint64) {
raftLogGCTask := &runner.RaftLogGCTask{
RaftEngine: d.ctx.engine.Raft,
RegionID: d.regionId,
StartIdx: d.LastCompactedIdx,
EndIdx: truncatedIndex + 1,
}
d.LastCompactedIdx = raftLogGCTask.EndIdx
d.ctx.raftLogGCTaskSender <- raftLogGCTask
}

Expand Down
7 changes: 4 additions & 3 deletions kv/raftstore/runner/raftlog_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
type RaftLogGCTask struct {
RaftEngine *badger.DB
RegionID uint64
StartIdx uint64
EndIdx uint64
}

type raftLogGcTaskRes uint64

type raftLogGCTaskHandler struct {
taskResCh chan<- raftLogGcTaskRes
taskResCh chan<- raftLogGcTaskRes
LastCompactedIdx uint64
}

func NewRaftLogGCTaskHandler() *raftLogGCTaskHandler {
Expand Down Expand Up @@ -80,7 +80,8 @@ func (r *raftLogGCTaskHandler) Handle(t worker.Task) {
return
}
log.Debugf("execute gc log. [regionId: %d, endIndex: %d]", logGcTask.RegionID, logGcTask.EndIdx)
collected, err := r.gcRaftLog(logGcTask.RaftEngine, logGcTask.RegionID, logGcTask.StartIdx, logGcTask.EndIdx)
collected, err := r.gcRaftLog(logGcTask.RaftEngine, logGcTask.RegionID, r.LastCompactedIdx, logGcTask.EndIdx)
r.LastCompactedIdx += collected
if err != nil {
log.Errorf("failed to gc. [regionId: %d, collected: %d, err: %v]", logGcTask.RegionID, collected, err)
} else {
Expand Down
4 changes: 0 additions & 4 deletions kv/raftstore/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func TestGcRaftLog(t *testing.T) {
raftLogGcTask: &RaftLogGCTask{
RaftEngine: raftDb,
RegionID: regionId,
StartIdx: uint64(0),
EndIdx: uint64(10),
},
expectedCollected: uint64(10),
Expand All @@ -147,7 +146,6 @@ func TestGcRaftLog(t *testing.T) {
raftLogGcTask: &RaftLogGCTask{
RaftEngine: raftDb,
RegionID: regionId,
StartIdx: uint64(0),
EndIdx: uint64(50),
},
expectedCollected: uint64(40),
Expand All @@ -159,7 +157,6 @@ func TestGcRaftLog(t *testing.T) {
raftLogGcTask: &RaftLogGCTask{
RaftEngine: raftDb,
RegionID: regionId,
StartIdx: uint64(50),
EndIdx: uint64(50),
},
expectedCollected: uint64(0),
Expand All @@ -171,7 +168,6 @@ func TestGcRaftLog(t *testing.T) {
raftLogGcTask: &RaftLogGCTask{
RaftEngine: raftDb,
RegionID: regionId,
StartIdx: uint64(50),
EndIdx: uint64(60),
},
expectedCollected: uint64(10),
Expand Down