Use constructor in tests
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {
|
||||
|
||||
func TestWatchableKVWatch(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
|
@ -70,12 +70,18 @@ type watchableStore struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
var _ WatchableKV = (*watchableStore)(nil)
|
||||
|
||||
// cancelFunc updates unsynced and synced maps when running
|
||||
// cancel operations.
|
||||
type cancelFunc func()
|
||||
|
||||
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
|
||||
return newWatchableStore(lg, b, le, cfg)
|
||||
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
|
||||
s := newWatchableStore(lg, b, le, cfg)
|
||||
s.wg.Add(2)
|
||||
go s.syncWatchersLoop()
|
||||
go s.syncVictimsLoop()
|
||||
return s
|
||||
}
|
||||
|
||||
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
|
||||
@ -95,9 +101,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S
|
||||
// use this store as the deleter so revokes trigger watch events
|
||||
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
||||
}
|
||||
s.wg.Add(2)
|
||||
go s.syncWatchersLoop()
|
||||
go s.syncVictimsLoop()
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
|
||||
|
||||
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(b)
|
||||
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, be)
|
||||
|
||||
k := []byte("testkey")
|
||||
@ -122,21 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
|
||||
// we should put to simulate the real-world use cases.
|
||||
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(b)
|
||||
s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore periodically calls syncWatchersLoop
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced for this benchmark.
|
||||
ws := &watchableStore{
|
||||
store: s,
|
||||
unsynced: newWatcherGroup(),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
// 'synced' doesn't get populated in this test.
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(ws, be)
|
||||
|
||||
@ -146,7 +132,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
// and force watchers to be in unsynced.
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue, lease.NoLease)
|
||||
ws.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
w := ws.NewWatchStream()
|
||||
defer w.Close()
|
||||
@ -178,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
|
||||
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(b)
|
||||
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(s, be)
|
||||
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey := []byte("foo")
|
||||
@ -52,7 +52,7 @@ func TestWatch(t *testing.T) {
|
||||
|
||||
func TestNewWatcherCancel(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey := []byte("foo")
|
||||
@ -81,16 +81,7 @@ func TestCancelUnsynced(t *testing.T) {
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
// 'synced' doesn't get populated in this test.
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
// Put a key so that we can spawn watchers on that key.
|
||||
@ -134,13 +125,7 @@ func TestCancelUnsynced(t *testing.T) {
|
||||
// and moves these watchers to synced.
|
||||
func TestSyncWatchers(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
|
||||
s := &watchableStore{
|
||||
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(s, b)
|
||||
|
||||
@ -218,7 +203,7 @@ func TestSyncWatchers(t *testing.T) {
|
||||
// TestWatchCompacted tests a watcher that watches on a compacted revision.
|
||||
func TestWatchCompacted(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey := []byte("foo")
|
||||
@ -256,7 +241,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
|
||||
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
lg := zaptest.NewLogger(t)
|
||||
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(lg, b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer func() {
|
||||
cleanup(s, b)
|
||||
@ -310,7 +295,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
|
||||
|
||||
func TestWatchFutureRev(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey := []byte("foo")
|
||||
@ -349,7 +334,7 @@ func TestWatchRestore(t *testing.T) {
|
||||
test := func(delay time.Duration) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey := []byte("foo")
|
||||
@ -395,11 +380,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
|
||||
// 5. choose the watcher from step 1, without panic
|
||||
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||
b1, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s1 := newWatchableStore(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{})
|
||||
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s1, b1)
|
||||
|
||||
b2, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s2 := newWatchableStore(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{})
|
||||
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s2, b2)
|
||||
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
@ -448,8 +433,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
oldMaxRevs := watchBatchMaxRevs
|
||||
defer func() {
|
||||
watchBatchMaxRevs = oldMaxRevs
|
||||
@ -583,7 +567,7 @@ func TestWatchVictims(t *testing.T) {
|
||||
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer func() {
|
||||
cleanup(s, b)
|
||||
@ -660,7 +644,7 @@ func TestWatchVictims(t *testing.T) {
|
||||
// canceling its watches.
|
||||
func TestStressWatchCancelClose(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
|
||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(b)
|
||||
watchable := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(watchable, be)
|
||||
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
// and the watched event attaches the correct watchID.
|
||||
func TestWatcherWatchID(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
@ -85,7 +85,7 @@ func TestWatcherWatchID(t *testing.T) {
|
||||
|
||||
func TestWatcherRequestsCustomID(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
@ -122,7 +122,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
|
||||
// and returns events with matching prefixes.
|
||||
func TestWatcherWatchPrefix(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
@ -196,7 +196,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
||||
// does not create watcher, which panics when canceling in range tree.
|
||||
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
@ -216,7 +216,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
|
||||
|
||||
func TestWatchDeleteRange(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer func() {
|
||||
b.Close()
|
||||
@ -256,7 +256,7 @@ func TestWatchDeleteRange(t *testing.T) {
|
||||
// with given id inside watchStream.
|
||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
@ -293,17 +293,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||
// report its correct progress.
|
||||
func TestWatcherRequestProgress(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(s, b)
|
||||
|
||||
@ -346,17 +336,7 @@ func TestWatcherRequestProgress(t *testing.T) {
|
||||
|
||||
func TestWatcherRequestProgressAll(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
|
||||
unsynced: newWatcherGroup(),
|
||||
synced: newWatcherGroup(),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer cleanup(s, b)
|
||||
|
||||
@ -395,7 +375,7 @@ func TestWatcherRequestProgressAll(t *testing.T) {
|
||||
|
||||
func TestWatcherWatchWithFilter(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
|
||||
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
|
Reference in New Issue
Block a user