Merge pull request #15985 from CaojiamingAlan/check_revision_before_write_hash
Check ScheduledCompactKeyName and FinishedCompactKeyName before writing hash
This commit is contained in:
commit
6c5fde5138
@ -268,12 +268,13 @@ func (cm *corruptionChecker) CompactHashCheck() {
|
||||
)
|
||||
hashes := cm.uncheckedRevisions()
|
||||
// Assume that revisions are ordered from largest to smallest
|
||||
for _, hash := range hashes {
|
||||
for i, hash := range hashes {
|
||||
peers := cm.hasher.PeerHashByRev(hash.Revision)
|
||||
if len(peers) == 0 {
|
||||
continue
|
||||
}
|
||||
if cm.checkPeerHashes(hash, peers) {
|
||||
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||
return nil, compactMainRev, nil
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
|
||||
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
|
||||
func (s *store) checkPrevCompactionCompleted() bool {
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
|
||||
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
|
||||
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
|
||||
ch := make(chan struct{})
|
||||
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
|
||||
if ctx.Err() != nil {
|
||||
@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
s.compactBarrier(context.TODO(), ch)
|
||||
return
|
||||
}
|
||||
s.hashes.Store(hash)
|
||||
// Only store the hash value if the previous hash is completed, i.e. this compaction
|
||||
// hashes every revision from last compaction. For more details, see #15919.
|
||||
if prevCompactionCompleted {
|
||||
s.hashes.Store(hash)
|
||||
} else {
|
||||
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
|
||||
}
|
||||
close(ch)
|
||||
})
|
||||
|
||||
@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
}
|
||||
|
||||
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
if err != nil {
|
||||
return ch, err
|
||||
}
|
||||
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev)
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||
s.mu.Lock()
|
||||
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
trace.Step("check and update compact revision")
|
||||
if err != nil {
|
||||
@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
return s.compact(trace, rev, prevCompactRev)
|
||||
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Commit() {
|
||||
|
@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
|
||||
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
|
||||
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
|
||||
|
||||
s.Compact(traceutil.TODO(), 3)
|
||||
@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(4))
|
||||
wact := []testutil.Action{
|
||||
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
|
||||
{Name: "delete", Params: []interface{}{schema.Key, key2}},
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
@ -193,3 +194,81 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
|
||||
assert.NoError(t, err, "error on alarm list")
|
||||
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
|
||||
}
|
||||
|
||||
func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
|
||||
checkTime := time.Second
|
||||
e2e.BeforeTest(t)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
slowCompactionNodeIndex := 1
|
||||
|
||||
// Start a new cluster, with compact hash check enabled.
|
||||
t.Log("creating a new cluster with 3 nodes...")
|
||||
|
||||
dataDirPath := t.TempDir()
|
||||
cfg := e2e.NewConfig(
|
||||
e2e.WithKeepDataDir(true),
|
||||
e2e.WithCompactHashCheckEnabled(true),
|
||||
e2e.WithCompactHashCheckTime(checkTime),
|
||||
e2e.WithClusterSize(3),
|
||||
e2e.WithDataDirPath(dataDirPath),
|
||||
e2e.WithLogLevel("info"),
|
||||
)
|
||||
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Assign a node a very slow compaction speed, so that its compaction can be interrupted.
|
||||
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t,
|
||||
e2e.WithCompactionBatchLimit(1),
|
||||
e2e.WithCompactionSleepInterval(1*time.Hour),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
})
|
||||
|
||||
// Put 10 identical keys to the cluster, so that the compaction will drop some stale values.
|
||||
t.Log("putting 10 values to the identical key...")
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
|
||||
require.NoError(t, err)
|
||||
for i := 0; i < 10; i++ {
|
||||
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
|
||||
require.NoError(t, err, "error on put")
|
||||
}
|
||||
|
||||
t.Log("compaction started...")
|
||||
_, err = cc.Compact(ctx, 5, config.CompactOption{})
|
||||
|
||||
err = epc.Procs[slowCompactionNodeIndex].Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
|
||||
err = epc.Procs[slowCompactionNodeIndex].Restart(ctx)
|
||||
|
||||
// Wait until the node finished compaction and the leader finished compaction hash check
|
||||
_, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction")
|
||||
require.NoError(t, err, "can't get log indicating finished scheduled compaction")
|
||||
|
||||
leaderIndex := epc.WaitLeader(t)
|
||||
_, err = epc.Procs[leaderIndex].Logs().ExpectWithContext(ctx, "finished compaction hash check")
|
||||
require.NoError(t, err, "can't get log indicating finished compaction hash check")
|
||||
|
||||
alarmResponse, err := cc.AlarmList(ctx)
|
||||
require.NoError(t, err, "error on alarm list")
|
||||
for _, alarm := range alarmResponse.Alarms {
|
||||
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
|
||||
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
|
||||
}
|
||||
}
|
||||
t.Log("no corruption detected.")
|
||||
}
|
||||
|
@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct {
|
||||
CompactHashCheckTime time.Duration
|
||||
GoFailEnabled bool
|
||||
CompactionBatchLimit int
|
||||
CompactionSleepInterval time.Duration
|
||||
|
||||
WarningUnaryRequestDuration time.Duration
|
||||
ExperimentalWarningUnaryRequestDuration time.Duration
|
||||
@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
|
||||
}
|
||||
|
||||
func WithCompactionSleepInterval(time time.Duration) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time }
|
||||
}
|
||||
|
||||
func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
|
||||
}
|
||||
@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
if cfg.CompactionBatchLimit != 0 {
|
||||
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
|
||||
}
|
||||
if cfg.CompactionSleepInterval != 0 {
|
||||
args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String())
|
||||
}
|
||||
if cfg.WarningUnaryRequestDuration != 0 {
|
||||
args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String())
|
||||
}
|
||||
@ -813,6 +821,32 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
|
||||
return proc.Start(ctx)
|
||||
}
|
||||
|
||||
// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
|
||||
// to the cluster.
|
||||
func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error {
|
||||
if epc.Procs[i].IsRunning() {
|
||||
return fmt.Errorf("process %d is still running, please close it before updating its options", i)
|
||||
}
|
||||
cfg := *epc.Cfg
|
||||
for _, opt := range opts {
|
||||
opt(&cfg)
|
||||
}
|
||||
serverCfg := cfg.EtcdServerProcessConfig(tb, i)
|
||||
|
||||
var initialCluster []string
|
||||
for _, p := range epc.Procs {
|
||||
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
|
||||
}
|
||||
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new")
|
||||
|
||||
proc, err := NewEtcdProcess(serverCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
epc.Procs[i] = proc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (epc *EtcdProcessCluster) Start(ctx context.Context) error {
|
||||
return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) })
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user