Remove syncing the v2 store TTLs
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@ -478,143 +478,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
|
||||
func TestSync(t *testing.T) {
|
||||
n := newNodeRecorder()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
srv.sync(10 * time.Second)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("sync should be non-blocking but did not return after 1s!")
|
||||
}
|
||||
|
||||
action, _ := n.Wait(1)
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var r pb.Request
|
||||
if err := r.Unmarshal(data); err != nil {
|
||||
t.Fatalf("unmarshal request error: %v", err)
|
||||
}
|
||||
if r.Method != "SYNC" {
|
||||
t.Errorf("method = %s, want SYNC", r.Method)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
|
||||
// after timeout
|
||||
func TestSyncTimeout(t *testing.T) {
|
||||
n := newProposalBlockerRecorder()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
srv.sync(0)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("sync should be non-blocking but did not return after 1s!")
|
||||
}
|
||||
|
||||
w := []testutil.Action{{Name: "Propose blocked"}}
|
||||
if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("action = %v, want %v", g, w)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: TestNoSyncWhenNoLeader
|
||||
|
||||
// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
|
||||
func TestSyncTrigger(t *testing.T) {
|
||||
n := newReadyNode()
|
||||
st := make(chan time.Time, 1)
|
||||
tk := &time.Ticker{C: st}
|
||||
r := newRaftNode(raftNodeConfig{
|
||||
lg: zaptest.NewLogger(t),
|
||||
Node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: newNopTransporter(),
|
||||
storage: mockstorage.NewStorageRecorder(""),
|
||||
})
|
||||
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: mockstore.NewNop(),
|
||||
SyncTicker: tk,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
|
||||
// trigger the server to become a leader and accept sync requests
|
||||
go func() {
|
||||
srv.start()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
},
|
||||
}
|
||||
// trigger a sync request
|
||||
st <- time.Time{}
|
||||
}()
|
||||
|
||||
action, _ := n.Wait(1)
|
||||
go srv.Stop()
|
||||
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var req pb.Request
|
||||
if err := req.Unmarshal(data); err != nil {
|
||||
t.Fatalf("error unmarshalling data: %v", err)
|
||||
}
|
||||
if req.Method != "SYNC" {
|
||||
t.Fatalf("unexpected proposed request: %#v", req.Method)
|
||||
}
|
||||
|
||||
// wait on stop message
|
||||
<-n.Chan()
|
||||
}
|
||||
|
||||
// TestSnapshot should snapshot the store and cut the persistent
|
||||
func TestSnapshot(t *testing.T) {
|
||||
revertFunc := verify.DisableVerifications()
|
||||
@ -1302,20 +1165,6 @@ func (n *nodeRecorder) ForgetLeader(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nodeProposalBlockerRecorder struct {
|
||||
nodeRecorder
|
||||
}
|
||||
|
||||
func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
|
||||
return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
|
||||
}
|
||||
|
||||
func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
|
||||
<-ctx.Done()
|
||||
n.Record(testutil.Action{Name: "Propose blocked"})
|
||||
return nil
|
||||
}
|
||||
|
||||
// readyNode is a nodeRecorder with a user-writeable ready channel
|
||||
type readyNode struct {
|
||||
nodeRecorder
|
||||
|
Reference in New Issue
Block a user