etcdserver: don't try to apply empty message list
If all messages have been applied, don't apply an empty messages list; otherwise appliedi will update to 0 and etcd will panic. Fixes #4278
This commit is contained in:
@ -667,6 +667,9 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
|
|||||||
if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
|
if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
|
||||||
ents = apply.entries[ep.appliedi+1-firsti:]
|
ents = apply.entries[ep.appliedi+1-firsti:]
|
||||||
}
|
}
|
||||||
|
if len(ents) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
var shouldstop bool
|
var shouldstop bool
|
||||||
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
||||||
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
||||||
|
@ -154,6 +154,56 @@ func TestDoBadLocalAction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestApplyRepeat tests that server handles repeat raft messages gracefully
|
||||||
|
func TestApplyRepeat(t *testing.T) {
|
||||||
|
n := newNodeConfChangeCommitterRecorder()
|
||||||
|
n.readyc <- raft.Ready{
|
||||||
|
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
||||||
|
}
|
||||||
|
cl := newTestCluster(nil)
|
||||||
|
st := store.New()
|
||||||
|
cl.SetStore(store.New())
|
||||||
|
cl.AddMember(&Member{ID: 1234})
|
||||||
|
s := &EtcdServer{
|
||||||
|
r: raftNode{
|
||||||
|
Node: n,
|
||||||
|
raftStorage: raft.NewMemoryStorage(),
|
||||||
|
storage: &storageRecorder{},
|
||||||
|
transport: rafthttp.NewNopTransporter(),
|
||||||
|
},
|
||||||
|
cfg: &ServerConfig{},
|
||||||
|
store: st,
|
||||||
|
cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
|
}
|
||||||
|
s.start()
|
||||||
|
req := &pb.Request{Method: "QGET", ID: uint64(1)}
|
||||||
|
ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
|
||||||
|
n.readyc <- raft.Ready{CommittedEntries: ents}
|
||||||
|
// dup msg
|
||||||
|
n.readyc <- raft.Ready{CommittedEntries: ents}
|
||||||
|
|
||||||
|
// use a conf change to block until dup msgs are all processed
|
||||||
|
cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
|
||||||
|
ents = []raftpb.Entry{{
|
||||||
|
Index: 2,
|
||||||
|
Type: raftpb.EntryConfChange,
|
||||||
|
Data: pbutil.MustMarshal(cc),
|
||||||
|
}}
|
||||||
|
n.readyc <- raft.Ready{CommittedEntries: ents}
|
||||||
|
act, err := n.Wait(1)
|
||||||
|
s.Stop()
|
||||||
|
|
||||||
|
// only want to confirm etcdserver won't panic; no data to check
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(act) == 0 {
|
||||||
|
t.Fatalf("expected len(act)=0, got %d", len(act))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestApplyRequest(t *testing.T) {
|
func TestApplyRequest(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
req pb.Request
|
req pb.Request
|
||||||
|
Reference in New Issue
Block a user