raftLog: compact applied entries only
compact MUST happen on entries that have been applied, or 1. it may screw up the log by setting wrong commitIndex 2. discard unapplied entries
This commit is contained in:
11
raft/log.go
11
raft/log.go
@ -152,8 +152,8 @@ func (l *raftLog) maybeCommit(maxIndex, term int64) bool {
|
|||||||
// and not greater than the index of the last entry.
|
// and not greater than the index of the last entry.
|
||||||
// the number of entries after compaction will be returned.
|
// the number of entries after compaction will be returned.
|
||||||
func (l *raftLog) compact(i int64) int64 {
|
func (l *raftLog) compact(i int64) int64 {
|
||||||
if l.isOutOfBounds(i) {
|
if l.isOutOfAppliedBounds(i) {
|
||||||
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex()))
|
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.applied))
|
||||||
}
|
}
|
||||||
l.ents = l.slice(i, l.lastIndex()+1)
|
l.ents = l.slice(i, l.lastIndex()+1)
|
||||||
l.unstable = max(i+1, l.unstable)
|
l.unstable = max(i+1, l.unstable)
|
||||||
@ -208,6 +208,13 @@ func (l *raftLog) isOutOfBounds(i int64) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *raftLog) isOutOfAppliedBounds(i int64) bool {
|
||||||
|
if i < l.offset || i > l.applied {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func min(a, b int64) int64 {
|
func min(a, b int64) int64 {
|
||||||
if a > b {
|
if a > b {
|
||||||
return b
|
return b
|
||||||
|
@ -76,11 +76,14 @@ func TestAppend(t *testing.T) {
|
|||||||
func TestCompactionSideEffects(t *testing.T) {
|
func TestCompactionSideEffects(t *testing.T) {
|
||||||
var i int64
|
var i int64
|
||||||
lastIndex := int64(1000)
|
lastIndex := int64(1000)
|
||||||
|
lastTerm := lastIndex
|
||||||
raftLog := newLog()
|
raftLog := newLog()
|
||||||
|
|
||||||
for i = 0; i < lastIndex; i++ {
|
for i = 0; i < lastIndex; i++ {
|
||||||
raftLog.append(int64(i), pb.Entry{Term: int64(i + 1), Index: int64(i + 1)})
|
raftLog.append(int64(i), pb.Entry{Term: int64(i + 1), Index: int64(i + 1)})
|
||||||
}
|
}
|
||||||
|
raftLog.maybeCommit(lastIndex, lastTerm)
|
||||||
|
raftLog.resetNextEnts()
|
||||||
|
|
||||||
raftLog.compact(500)
|
raftLog.compact(500)
|
||||||
|
|
||||||
@ -149,16 +152,18 @@ func TestUnstableEnts(t *testing.T) {
|
|||||||
//TestCompaction ensures that the number of log entreis is correct after compactions.
|
//TestCompaction ensures that the number of log entreis is correct after compactions.
|
||||||
func TestCompaction(t *testing.T) {
|
func TestCompaction(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
app int
|
applied int64
|
||||||
compact []int64
|
lastIndex int64
|
||||||
wleft []int
|
compact []int64
|
||||||
wallow bool
|
wleft []int
|
||||||
|
wallow bool
|
||||||
}{
|
}{
|
||||||
// out of upper bound
|
// out of upper bound
|
||||||
{1000, []int64{1001}, []int{-1}, false},
|
{1000, 1000, []int64{1001}, []int{-1}, false},
|
||||||
{1000, []int64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
|
{1000, 1000, []int64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
|
||||||
// out of lower bound
|
// out of lower bound
|
||||||
{1000, []int64{300, 299}, []int{701, -1}, false},
|
{1000, 1000, []int64{300, 299}, []int{701, -1}, false},
|
||||||
|
{0, 1000, []int64{1}, []int{-1}, false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -172,9 +177,11 @@ func TestCompaction(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
raftLog := newLog()
|
raftLog := newLog()
|
||||||
for i := 0; i < tt.app; i++ {
|
for i := int64(0); i < tt.lastIndex; i++ {
|
||||||
raftLog.append(int64(i), pb.Entry{})
|
raftLog.append(int64(i), pb.Entry{})
|
||||||
}
|
}
|
||||||
|
raftLog.maybeCommit(tt.applied, 0)
|
||||||
|
raftLog.resetNextEnts()
|
||||||
|
|
||||||
for j := 0; j < len(tt.compact); j++ {
|
for j := 0; j < len(tt.compact); j++ {
|
||||||
raftLog.compact(tt.compact[j])
|
raftLog.compact(tt.compact[j])
|
||||||
|
Reference in New Issue
Block a user