diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 754aea6b0..c1b9ad6b3 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -24,7 +24,6 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" - "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -106,11 +105,6 @@ type raftNode struct { // a chan to send out readState readStateC chan raft.ReadState - // TODO: remove the etcdserver related logic from raftNode - // TODO: add a state machine interface to apply the commit entries - // and do snapshot/recover - s *EtcdServer - // utility ticker <-chan time.Time raftStorage *raft.MemoryStorage @@ -121,32 +115,18 @@ type raftNode struct { // If transport is nil, server will panic. transport rafthttp.Transporter - td *contention.TimeoutDetector - stopped chan struct{} done chan struct{} } // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. -// TODO: Ideally raftNode should get rid of the passed in server structure. -func (r *raftNode) start(s *EtcdServer) { - r.s = s +func (r *raftNode) start(rh *raftReadyHandler) { r.applyc = make(chan apply) r.stopped = make(chan struct{}) r.done = make(chan struct{}) - heartbeat := 200 * time.Millisecond - if s.Cfg != nil { - heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond - } - // set up contention detectors for raft heartbeat message. - // expect to send a heartbeat within 2 heartbeat intervals. - r.td = contention.NewTimeoutDetector(2 * heartbeat) - go func() { - var syncC <-chan time.Time - defer r.onStop() islead := false @@ -170,29 +150,8 @@ func (r *raftNode) start(s *EtcdServer) { } atomic.StoreUint64(&r.lead, rd.SoftState.Lead) - if rd.RaftState == raft.StateLeader { - islead = true - syncC = r.s.SyncTicker - - // TODO: remove the nil checking - // current test utility does not provide the stats - if r.s.stats != nil { - r.s.stats.BecomeLeader() - } - if r.s.compactor != nil { - r.s.compactor.Resume() - } - r.td.Reset() - } else { - islead = false - if r.s.lessor != nil { - r.s.lessor.Demote() - } - if r.s.compactor != nil { - r.s.compactor.Pause() - } - syncC = nil - } + islead = rd.RaftState == raft.StateLeader + rh.leadershipUpdate() } if len(rd.ReadStates) != 0 { @@ -221,7 +180,7 @@ func (r *raftNode) start(s *EtcdServer) { // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} - r.s.send(rd.Messages) + rh.sendMessage(rd.Messages) } // gofail: var raftBeforeSave struct{} @@ -248,12 +207,10 @@ func (r *raftNode) start(s *EtcdServer) { if !islead { // gofail: var raftBeforeFollowerSend struct{} - r.s.send(rd.Messages) + rh.sendMessage(rd.Messages) } raftDone <- struct{}{} r.Advance() - case <-syncC: - r.s.sync(r.s.Cfg.ReqTimeout()) case <-r.stopped: return } diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 6df79a3e5..a93de4483 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -159,7 +159,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), }} - srv.r.start(srv) + srv.r.start(&raftReadyHandler{sendMessage: func(msgs []raftpb.Message) { srv.send(msgs) }}) n.readyc <- raft.Ready{} select { case <-srv.r.applyc: diff --git a/etcdserver/server.go b/etcdserver/server.go index fa273a0ae..002e91e1b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,6 +40,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" @@ -176,7 +177,8 @@ type EtcdServer struct { snapCount uint64 - w wait.Wait + w wait.Wait + td *contention.TimeoutDetector readMu sync.RWMutex // read routine notifies etcd server that it waits for reading by sending an empty struct to @@ -390,15 +392,19 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { sstats.Initialize() lstats := stats.NewLeaderStats(id.String()) + heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ readych: make(chan struct{}), Cfg: cfg, snapCount: cfg.SnapCount, - errorc: make(chan error, 1), - store: st, + // set up contention detectors for raft heartbeat message. + // expect to send a heartbeat within 2 heartbeat intervals. + td: contention.NewTimeoutDetector(2 * heartbeat), + errorc: make(chan error, 1), + store: st, r: raftNode{ Node: n, - ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), + ticker: time.Tick(heartbeat), raftStorage: s, storage: NewStorage(w, ss), readStateC: make(chan raft.ReadState, 1), @@ -418,7 +424,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.be = be - minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond + minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. @@ -570,12 +576,64 @@ type etcdProgress struct { appliedi uint64 } +// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, +// and helps decouple state machine logic from Raft algorithms. +// TODO: add a state machine interface to apply the commit entries and do snapshot/recover +type raftReadyHandler struct { + leadershipUpdate func() + sendMessage func(msgs []raftpb.Message) +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } - s.r.start(s) + + var ( + smu sync.RWMutex + syncC <-chan time.Time + ) + setSyncC := func(ch <-chan time.Time) { + smu.Lock() + syncC = ch + smu.Unlock() + } + getSyncC := func() (ch <-chan time.Time) { + smu.RLock() + ch = syncC + smu.RUnlock() + return + } + rh := &raftReadyHandler{ + leadershipUpdate: func() { + if !s.isLeader() { + if s.lessor != nil { + s.lessor.Demote() + } + if s.compactor != nil { + s.compactor.Pause() + } + setSyncC(nil) + } else { + setSyncC(s.SyncTicker) + } + + // TODO: remove the nil checking + // current test utility does not provide the stats + if s.stats != nil { + s.stats.BecomeLeader() + } + if s.compactor != nil { + s.compactor.Resume() + } + if s.td != nil { + s.td.Reset() + } + }, + sendMessage: func(msgs []raftpb.Message) { s.send(msgs) }, + } + s.r.start(rh) // asynchronously accept apply packets, dispatch progress in-order sched := schedule.NewFIFOScheduler() @@ -655,6 +713,8 @@ func (s *EtcdServer) run() { plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") return + case <-getSyncC(): + s.sync(s.Cfg.ReqTimeout()) case <-s.stop: return } @@ -1130,7 +1190,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) { ms[i].To = 0 } if ms[i].Type == raftpb.MsgHeartbeat { - ok, exceed := s.r.td.Observe(ms[i].To) + ok, exceed := s.td.Observe(ms[i].To) if !ok { // TODO: limit request rate. plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)