From b9e30bf878b78886ea20610e3eef9eaa10c11c90 Mon Sep 17 00:00:00 2001 From: caojiamingalan Date: Mon, 29 May 2023 15:47:05 -0500 Subject: [PATCH] etcdserver: add e2e test to reproduce the incorrect hash issue when resuming scheduled compaction. check ScheduledCompactKeyName and FinishedCompactKeyName before writing hash to hashstore. If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm. Signed-off-by: caojiamingalan --- server/etcdserver/corrupt.go | 3 +- server/storage/mvcc/kvstore.go | 27 ++++++++-- server/storage/mvcc/kvstore_test.go | 4 ++ tests/e2e/corrupt_test.go | 79 +++++++++++++++++++++++++++++ tests/framework/e2e/cluster.go | 34 +++++++++++++ 5 files changed, 141 insertions(+), 6 deletions(-) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 3aa5c78f1..0b4596823 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -268,12 +268,13 @@ func (cm *corruptionChecker) CompactHashCheck() { ) hashes := cm.uncheckedRevisions() // Assume that revisions are ordered from largest to smallest - for _, hash := range hashes { + for i, hash := range hashes { peers := cm.hasher.PeerHashByRev(hash.Revision) if len(peers) == 0 { continue } if cm.checkPeerHashes(hash, peers) { + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) return } } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 8bc1b07d9..eeb82c68b 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { +// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed. +func (s *store) checkPrevCompactionCompleted() bool { + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) + finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) + return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound +} + +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) { ch := make(chan struct{}) j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { if ctx.Err() != nil { @@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch s.compactBarrier(context.TODO(), ch) return } - s.hashes.Store(hash) + // Only store the hash value if the previous hash is completed, i.e. this compaction + // hashes every revision from last compaction. For more details, see #15919. + if prevCompactionCompleted { + s.hashes.Store(hash) + } else { + s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") + } close(ch) }) @@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev, prevCompactRev) + return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() - + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { @@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev, prevCompactRev) + return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Commit() { diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index a3bdaac77..af4c3846c 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) { fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(lg, revision{1, 0}, false) key2 := newTestKeyBytes(lg, revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} s.Compact(traceutil.TODO(), 3) @@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ + {Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}}, + {Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}}, {Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{schema.Key, key2}}, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index e9f7a66c4..94ee2084c 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -193,3 +194,81 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { + checkTime := time.Second + e2e.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + slowCompactionNodeIndex := 1 + + // Start a new cluster, with compact hash check enabled. + t.Log("creating a new cluster with 3 nodes...") + + dataDirPath := t.TempDir() + cfg := e2e.NewConfig( + e2e.WithKeepDataDir(true), + e2e.WithCompactHashCheckEnabled(true), + e2e.WithCompactHashCheckTime(checkTime), + e2e.WithClusterSize(3), + e2e.WithDataDirPath(dataDirPath), + e2e.WithLogLevel("info"), + ) + epc, err := e2e.InitEtcdProcessCluster(t, cfg) + require.NoError(t, err) + + // Assign a node a very slow compaction speed, so that its compaction can be interrupted. + err = epc.UpdateProcOptions(slowCompactionNodeIndex, t, + e2e.WithCompactionBatchLimit(1), + e2e.WithCompactionSleepInterval(1*time.Hour), + ) + require.NoError(t, err) + + epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg) + require.NoError(t, err) + + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // Put 10 identical keys to the cluster, so that the compaction will drop some stale values. + t.Log("putting 10 values to the identical key...") + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC()) + require.NoError(t, err) + for i := 0; i < 10; i++ { + err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{}) + require.NoError(t, err, "error on put") + } + + t.Log("compaction started...") + _, err = cc.Compact(ctx, 5, config.CompactOption{}) + + err = epc.Procs[slowCompactionNodeIndex].Close() + require.NoError(t, err) + + err = epc.UpdateProcOptions(slowCompactionNodeIndex, t) + require.NoError(t, err) + + t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex) + err = epc.Procs[slowCompactionNodeIndex].Restart(ctx) + + // Wait until the node finished compaction and the leader finished compaction hash check + _, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction") + require.NoError(t, err, "can't get log indicating finished scheduled compaction") + + leaderIndex := epc.WaitLeader(t) + _, err = epc.Procs[leaderIndex].Logs().ExpectWithContext(ctx, "finished compaction hash check") + require.NoError(t, err, "can't get log indicating finished compaction hash check") + + alarmResponse, err := cc.AlarmList(ctx) + require.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatal("there should be no corruption after resuming the compaction, but corruption detected") + } + } + t.Log("no corruption detected.") +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 0eafc4579..c611ed448 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckTime time.Duration GoFailEnabled bool CompactionBatchLimit int + CompactionSleepInterval time.Duration WarningUnaryRequestDuration time.Duration ExperimentalWarningUnaryRequestDuration time.Duration @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit } } +func WithCompactionSleepInterval(time time.Duration) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time } +} + func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval } } @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.CompactionBatchLimit != 0 { args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) } + if cfg.CompactionSleepInterval != 0 { + args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String()) + } if cfg.WarningUnaryRequestDuration != 0 { args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String()) } @@ -813,6 +821,32 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces return proc.Start(ctx) } +// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical +// to the cluster. +func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error { + if epc.Procs[i].IsRunning() { + return fmt.Errorf("process %d is still running, please close it before updating its options", i) + } + cfg := *epc.Cfg + for _, opt := range opts { + opt(&cfg) + } + serverCfg := cfg.EtcdServerProcessConfig(tb, i) + + var initialCluster []string + for _, p := range epc.Procs { + initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String())) + } + epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") + + proc, err := NewEtcdProcess(serverCfg) + if err != nil { + return err + } + epc.Procs[i] = proc + return nil +} + func (epc *EtcdProcessCluster) Start(ctx context.Context) error { return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) }) }