etcdserver: drain leaky goroutines before test completed

If pending changes aren't committed before test completed, it might cause
data race when we don't drain all the background goroutines.

```bash
$ cd server
$ go test -race -v -run TestApplyRepeat ./etcdserver
...
panic: Log in goroutine after TestApplyRepeat has completed: 2024-02-03T17:06:13.262+0800       DEBUG   bbolt   Committing transaction 2

goroutine 81 [running]:
testing.(*common).logDepth(0xc000502820, {0xc0001b0460, 0x41}, 0x3)
        /usr/local/go/src/testing/testing.go:1022 +0x6d4
testing.(*common).log(...)
        /usr/local/go/src/testing/testing.go:1004
testing.(*common).Logf(0xc000502820, {0x1421ad7, 0x2}, {0xc000603520, 0x1, 0x1})
        /usr/local/go/src/testing/testing.go:1055 +0xa5
go.uber.org/zap/zaptest.testingWriter.Write({{0x15f1f90?, 0xc000502820?}, 0xda?}, {0xc000119800, 0x42, 0x400})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zaptest/logger.go:130 +0x11e
go.uber.org/zap/zapcore.(*ioCore).Write(0xc0000b55c0, {0xff, {0xc1679e614f9fd7a4, 0x73a3657, 0x1cc2400}, {0x1422b2d, 0x5}, {0xc0001a0330, 0x18}, {0x0, ...}, ...}, ...)
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/core.go:99 +0x193
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc000115930, {0x0, 0x0, 0x0})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/entry.go:253 +0x2f0
go.uber.org/zap.(*SugaredLogger).log(0xc0001960f8, 0xff, {0x1437885, 0x19}, {0xc0006034e0, 0x1, 0x1}, {0x0, 0x0, 0x0})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:316 +0x130
go.uber.org/zap.(*SugaredLogger).Debugf(...)
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:171
go.etcd.io/bbolt.(*Tx).Commit(0xc0001aa9a0)
        /home/fuwei/go/pkg/mod/go.etcd.io/bbolt@v1.4.0-alpha.0/tx.go:173 +0x206
go.etcd.io/etcd/server/v3/storage/backend.(*batchTx).commit(0xc00019b180, 0x0)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:269 +0xdf
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).unsafeCommit(0xc00019b180, 0x0)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:378 +0x425
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).commit(0xc00019b180, 0x80?)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:355 +0x78
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).Commit(0xc00019b180)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:342 +0x35
go.etcd.io/etcd/server/v3/storage/backend.(*backend).run(0xc000478180)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:426 +0x2c7
created by go.etcd.io/etcd/server/v3/storage/backend.newBackend in goroutine 80
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:227 +0xbfd
FAIL    go.etcd.io/etcd/server/v3/etcdserver    0.129s
FAIL
```

This patch also drains goroutines related to raftNode and watch store.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu
2024-02-03 17:06:55 +08:00
parent 43d6759977
commit 32ee8b877a
4 changed files with 20 additions and 1 deletions

View File

@ -198,6 +198,9 @@ func TestBootstrapBackend(t *testing.T) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
ss := snap.New(cfg.Logger, cfg.SnapDir())
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
defer t.Cleanup(func() {
backend.Close()
})
hasError := err != nil
expectedHasError := tt.expectedError != nil

View File

@ -517,6 +517,9 @@ func TestHashKVHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer func() {
assert.NoError(t, etcdSrv.kv.Close())
}()
ph := &hashKVHandler{
lg: zap.NewNop(),
server: etcdSrv,

View File

@ -207,7 +207,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
updateLead: func(uint64) {},
updateLeadership: func(bool) {},
})
defer srv.r.Stop()
defer srv.r.stop()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},

View File

@ -77,6 +77,7 @@ func TestApplyRepeat(t *testing.T) {
st := v2store.New()
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.AddMember(&membership.Member{ID: 1234}, true)
@ -201,6 +202,7 @@ func TestV2SetClusterVersion(t *testing.T) {
func TestApplyConfStateWithRestart(t *testing.T) {
n := newNodeRecorder()
srv := newServer(t, n)
defer srv.Cleanup()
assert.Equal(t, srv.consistIndex.ConsistentIndex(), uint64(0))
@ -476,6 +478,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
consistIndex: ci,
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
defer srv.r.raftNodeConfig.Stop()
// create EntryConfChange entry
now := time.Now()
@ -547,6 +550,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cl := membership.NewCluster(lg)
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
for i := 1; i <= 5; i++ {
@ -596,6 +600,7 @@ func TestSnapshot(t *testing.T) {
defer revertFunc()
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
@ -615,6 +620,9 @@ func TestSnapshot(t *testing.T) {
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
srv.be = be
cl := membership.NewCluster(zaptest.NewLogger(t))
@ -921,6 +929,7 @@ func TestProcessIgnoreMismatchMessage(t *testing.T) {
},
},
})
defer r.raftNodeConfig.Stop()
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
@ -961,6 +970,7 @@ func TestRemoveMember(t *testing.T) {
st := v2store.New()
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.AddMember(&membership.Member{ID: 1234}, true)
@ -1060,6 +1070,7 @@ func TestPublishV3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
@ -1130,6 +1141,7 @@ func TestPublishV3Retry(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
@ -1181,6 +1193,7 @@ func TestUpdateVersionV3(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),