diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 3aa5c78f1..0b4596823 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -268,12 +268,13 @@ func (cm *corruptionChecker) CompactHashCheck() { ) hashes := cm.uncheckedRevisions() // Assume that revisions are ordered from largest to smallest - for _, hash := range hashes { + for i, hash := range hashes { peers := cm.hasher.PeerHashByRev(hash.Revision) if len(peers) == 0 { continue } if cm.checkPeerHashes(hash, peers) { + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) return } } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 8bc1b07d9..eeb82c68b 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { +// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed. +func (s *store) checkPrevCompactionCompleted() bool { + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) + finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) + return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound +} + +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) { ch := make(chan struct{}) j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { if ctx.Err() != nil { @@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch s.compactBarrier(context.TODO(), ch) return } - s.hashes.Store(hash) + // Only store the hash value if the previous hash is completed, i.e. this compaction + // hashes every revision from last compaction. For more details, see #15919. + if prevCompactionCompleted { + s.hashes.Store(hash) + } else { + s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") + } close(ch) }) @@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev, prevCompactRev) + return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() - + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { @@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev, prevCompactRev) + return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Commit() { diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index a3bdaac77..af4c3846c 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) { fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(lg, revision{1, 0}, false) key2 := newTestKeyBytes(lg, revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} s.Compact(traceutil.TODO(), 3) @@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ + {Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}}, + {Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}}, {Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{schema.Key, key2}}, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index e9f7a66c4..94ee2084c 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -193,3 +194,81 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { + checkTime := time.Second + e2e.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + slowCompactionNodeIndex := 1 + + // Start a new cluster, with compact hash check enabled. + t.Log("creating a new cluster with 3 nodes...") + + dataDirPath := t.TempDir() + cfg := e2e.NewConfig( + e2e.WithKeepDataDir(true), + e2e.WithCompactHashCheckEnabled(true), + e2e.WithCompactHashCheckTime(checkTime), + e2e.WithClusterSize(3), + e2e.WithDataDirPath(dataDirPath), + e2e.WithLogLevel("info"), + ) + epc, err := e2e.InitEtcdProcessCluster(t, cfg) + require.NoError(t, err) + + // Assign a node a very slow compaction speed, so that its compaction can be interrupted. + err = epc.UpdateProcOptions(slowCompactionNodeIndex, t, + e2e.WithCompactionBatchLimit(1), + e2e.WithCompactionSleepInterval(1*time.Hour), + ) + require.NoError(t, err) + + epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg) + require.NoError(t, err) + + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // Put 10 identical keys to the cluster, so that the compaction will drop some stale values. + t.Log("putting 10 values to the identical key...") + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC()) + require.NoError(t, err) + for i := 0; i < 10; i++ { + err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{}) + require.NoError(t, err, "error on put") + } + + t.Log("compaction started...") + _, err = cc.Compact(ctx, 5, config.CompactOption{}) + + err = epc.Procs[slowCompactionNodeIndex].Close() + require.NoError(t, err) + + err = epc.UpdateProcOptions(slowCompactionNodeIndex, t) + require.NoError(t, err) + + t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex) + err = epc.Procs[slowCompactionNodeIndex].Restart(ctx) + + // Wait until the node finished compaction and the leader finished compaction hash check + _, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction") + require.NoError(t, err, "can't get log indicating finished scheduled compaction") + + leaderIndex := epc.WaitLeader(t) + _, err = epc.Procs[leaderIndex].Logs().ExpectWithContext(ctx, "finished compaction hash check") + require.NoError(t, err, "can't get log indicating finished compaction hash check") + + alarmResponse, err := cc.AlarmList(ctx) + require.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatal("there should be no corruption after resuming the compaction, but corruption detected") + } + } + t.Log("no corruption detected.") +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 0eafc4579..c611ed448 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckTime time.Duration GoFailEnabled bool CompactionBatchLimit int + CompactionSleepInterval time.Duration WarningUnaryRequestDuration time.Duration ExperimentalWarningUnaryRequestDuration time.Duration @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit } } +func WithCompactionSleepInterval(time time.Duration) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time } +} + func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval } } @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.CompactionBatchLimit != 0 { args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) } + if cfg.CompactionSleepInterval != 0 { + args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String()) + } if cfg.WarningUnaryRequestDuration != 0 { args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String()) } @@ -813,6 +821,32 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces return proc.Start(ctx) } +// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical +// to the cluster. +func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error { + if epc.Procs[i].IsRunning() { + return fmt.Errorf("process %d is still running, please close it before updating its options", i) + } + cfg := *epc.Cfg + for _, opt := range opts { + opt(&cfg) + } + serverCfg := cfg.EtcdServerProcessConfig(tb, i) + + var initialCluster []string + for _, p := range epc.Procs { + initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String())) + } + epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") + + proc, err := NewEtcdProcess(serverCfg) + if err != nil { + return err + } + epc.Procs[i] = proc + return nil +} + func (epc *EtcdProcessCluster) Start(ctx context.Context) error { return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) }) }