etcdserver: separate EtcdServer from raftNode
This commit is contained in:
@ -24,7 +24,6 @@ import (
|
|||||||
|
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/etcdserver/membership"
|
"github.com/coreos/etcd/etcdserver/membership"
|
||||||
"github.com/coreos/etcd/pkg/contention"
|
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
@ -106,11 +105,6 @@ type raftNode struct {
|
|||||||
// a chan to send out readState
|
// a chan to send out readState
|
||||||
readStateC chan raft.ReadState
|
readStateC chan raft.ReadState
|
||||||
|
|
||||||
// TODO: remove the etcdserver related logic from raftNode
|
|
||||||
// TODO: add a state machine interface to apply the commit entries
|
|
||||||
// and do snapshot/recover
|
|
||||||
s *EtcdServer
|
|
||||||
|
|
||||||
// utility
|
// utility
|
||||||
ticker <-chan time.Time
|
ticker <-chan time.Time
|
||||||
raftStorage *raft.MemoryStorage
|
raftStorage *raft.MemoryStorage
|
||||||
@ -121,32 +115,18 @@ type raftNode struct {
|
|||||||
// If transport is nil, server will panic.
|
// If transport is nil, server will panic.
|
||||||
transport rafthttp.Transporter
|
transport rafthttp.Transporter
|
||||||
|
|
||||||
td *contention.TimeoutDetector
|
|
||||||
|
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
||||||
// to modify the fields after it has been started.
|
// to modify the fields after it has been started.
|
||||||
// TODO: Ideally raftNode should get rid of the passed in server structure.
|
func (r *raftNode) start(rh *raftReadyHandler) {
|
||||||
func (r *raftNode) start(s *EtcdServer) {
|
|
||||||
r.s = s
|
|
||||||
r.applyc = make(chan apply)
|
r.applyc = make(chan apply)
|
||||||
r.stopped = make(chan struct{})
|
r.stopped = make(chan struct{})
|
||||||
r.done = make(chan struct{})
|
r.done = make(chan struct{})
|
||||||
|
|
||||||
heartbeat := 200 * time.Millisecond
|
|
||||||
if s.Cfg != nil {
|
|
||||||
heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond
|
|
||||||
}
|
|
||||||
// set up contention detectors for raft heartbeat message.
|
|
||||||
// expect to send a heartbeat within 2 heartbeat intervals.
|
|
||||||
r.td = contention.NewTimeoutDetector(2 * heartbeat)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var syncC <-chan time.Time
|
|
||||||
|
|
||||||
defer r.onStop()
|
defer r.onStop()
|
||||||
islead := false
|
islead := false
|
||||||
|
|
||||||
@ -170,29 +150,8 @@ func (r *raftNode) start(s *EtcdServer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
||||||
if rd.RaftState == raft.StateLeader {
|
islead = rd.RaftState == raft.StateLeader
|
||||||
islead = true
|
rh.leadershipUpdate()
|
||||||
syncC = r.s.SyncTicker
|
|
||||||
|
|
||||||
// TODO: remove the nil checking
|
|
||||||
// current test utility does not provide the stats
|
|
||||||
if r.s.stats != nil {
|
|
||||||
r.s.stats.BecomeLeader()
|
|
||||||
}
|
|
||||||
if r.s.compactor != nil {
|
|
||||||
r.s.compactor.Resume()
|
|
||||||
}
|
|
||||||
r.td.Reset()
|
|
||||||
} else {
|
|
||||||
islead = false
|
|
||||||
if r.s.lessor != nil {
|
|
||||||
r.s.lessor.Demote()
|
|
||||||
}
|
|
||||||
if r.s.compactor != nil {
|
|
||||||
r.s.compactor.Pause()
|
|
||||||
}
|
|
||||||
syncC = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rd.ReadStates) != 0 {
|
if len(rd.ReadStates) != 0 {
|
||||||
@ -221,7 +180,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
|||||||
// For more details, check raft thesis 10.2.1
|
// For more details, check raft thesis 10.2.1
|
||||||
if islead {
|
if islead {
|
||||||
// gofail: var raftBeforeLeaderSend struct{}
|
// gofail: var raftBeforeLeaderSend struct{}
|
||||||
r.s.send(rd.Messages)
|
rh.sendMessage(rd.Messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
// gofail: var raftBeforeSave struct{}
|
// gofail: var raftBeforeSave struct{}
|
||||||
@ -248,12 +207,10 @@ func (r *raftNode) start(s *EtcdServer) {
|
|||||||
|
|
||||||
if !islead {
|
if !islead {
|
||||||
// gofail: var raftBeforeFollowerSend struct{}
|
// gofail: var raftBeforeFollowerSend struct{}
|
||||||
r.s.send(rd.Messages)
|
rh.sendMessage(rd.Messages)
|
||||||
}
|
}
|
||||||
raftDone <- struct{}{}
|
raftDone <- struct{}{}
|
||||||
r.Advance()
|
r.Advance()
|
||||||
case <-syncC:
|
|
||||||
r.s.sync(r.s.Cfg.ReqTimeout())
|
|
||||||
case <-r.stopped:
|
case <-r.stopped:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
|
|||||||
raftStorage: raft.NewMemoryStorage(),
|
raftStorage: raft.NewMemoryStorage(),
|
||||||
transport: rafthttp.NewNopTransporter(),
|
transport: rafthttp.NewNopTransporter(),
|
||||||
}}
|
}}
|
||||||
srv.r.start(srv)
|
srv.r.start(&raftReadyHandler{sendMessage: func(msgs []raftpb.Message) { srv.send(msgs) }})
|
||||||
n.readyc <- raft.Ready{}
|
n.readyc <- raft.Ready{}
|
||||||
select {
|
select {
|
||||||
case <-srv.r.applyc:
|
case <-srv.r.applyc:
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
"github.com/coreos/etcd/mvcc"
|
"github.com/coreos/etcd/mvcc"
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
|
"github.com/coreos/etcd/pkg/contention"
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/coreos/etcd/pkg/idutil"
|
"github.com/coreos/etcd/pkg/idutil"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
@ -176,7 +177,8 @@ type EtcdServer struct {
|
|||||||
|
|
||||||
snapCount uint64
|
snapCount uint64
|
||||||
|
|
||||||
w wait.Wait
|
w wait.Wait
|
||||||
|
td *contention.TimeoutDetector
|
||||||
|
|
||||||
readMu sync.RWMutex
|
readMu sync.RWMutex
|
||||||
// read routine notifies etcd server that it waits for reading by sending an empty struct to
|
// read routine notifies etcd server that it waits for reading by sending an empty struct to
|
||||||
@ -390,15 +392,19 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
sstats.Initialize()
|
sstats.Initialize()
|
||||||
lstats := stats.NewLeaderStats(id.String())
|
lstats := stats.NewLeaderStats(id.String())
|
||||||
|
|
||||||
|
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
||||||
srv = &EtcdServer{
|
srv = &EtcdServer{
|
||||||
readych: make(chan struct{}),
|
readych: make(chan struct{}),
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
snapCount: cfg.SnapCount,
|
snapCount: cfg.SnapCount,
|
||||||
errorc: make(chan error, 1),
|
// set up contention detectors for raft heartbeat message.
|
||||||
store: st,
|
// expect to send a heartbeat within 2 heartbeat intervals.
|
||||||
|
td: contention.NewTimeoutDetector(2 * heartbeat),
|
||||||
|
errorc: make(chan error, 1),
|
||||||
|
store: st,
|
||||||
r: raftNode{
|
r: raftNode{
|
||||||
Node: n,
|
Node: n,
|
||||||
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
|
ticker: time.Tick(heartbeat),
|
||||||
raftStorage: s,
|
raftStorage: s,
|
||||||
storage: NewStorage(w, ss),
|
storage: NewStorage(w, ss),
|
||||||
readStateC: make(chan raft.ReadState, 1),
|
readStateC: make(chan raft.ReadState, 1),
|
||||||
@ -418,7 +424,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||||
|
|
||||||
srv.be = be
|
srv.be = be
|
||||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
|
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
@ -570,12 +576,64 @@ type etcdProgress struct {
|
|||||||
appliedi uint64
|
appliedi uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
|
||||||
|
// and helps decouple state machine logic from Raft algorithms.
|
||||||
|
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
|
||||||
|
type raftReadyHandler struct {
|
||||||
|
leadershipUpdate func()
|
||||||
|
sendMessage func(msgs []raftpb.Message)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) run() {
|
func (s *EtcdServer) run() {
|
||||||
snap, err := s.r.raftStorage.Snapshot()
|
snap, err := s.r.raftStorage.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Panicf("get snapshot from raft storage error: %v", err)
|
plog.Panicf("get snapshot from raft storage error: %v", err)
|
||||||
}
|
}
|
||||||
s.r.start(s)
|
|
||||||
|
var (
|
||||||
|
smu sync.RWMutex
|
||||||
|
syncC <-chan time.Time
|
||||||
|
)
|
||||||
|
setSyncC := func(ch <-chan time.Time) {
|
||||||
|
smu.Lock()
|
||||||
|
syncC = ch
|
||||||
|
smu.Unlock()
|
||||||
|
}
|
||||||
|
getSyncC := func() (ch <-chan time.Time) {
|
||||||
|
smu.RLock()
|
||||||
|
ch = syncC
|
||||||
|
smu.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rh := &raftReadyHandler{
|
||||||
|
leadershipUpdate: func() {
|
||||||
|
if !s.isLeader() {
|
||||||
|
if s.lessor != nil {
|
||||||
|
s.lessor.Demote()
|
||||||
|
}
|
||||||
|
if s.compactor != nil {
|
||||||
|
s.compactor.Pause()
|
||||||
|
}
|
||||||
|
setSyncC(nil)
|
||||||
|
} else {
|
||||||
|
setSyncC(s.SyncTicker)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: remove the nil checking
|
||||||
|
// current test utility does not provide the stats
|
||||||
|
if s.stats != nil {
|
||||||
|
s.stats.BecomeLeader()
|
||||||
|
}
|
||||||
|
if s.compactor != nil {
|
||||||
|
s.compactor.Resume()
|
||||||
|
}
|
||||||
|
if s.td != nil {
|
||||||
|
s.td.Reset()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
sendMessage: func(msgs []raftpb.Message) { s.send(msgs) },
|
||||||
|
}
|
||||||
|
s.r.start(rh)
|
||||||
|
|
||||||
// asynchronously accept apply packets, dispatch progress in-order
|
// asynchronously accept apply packets, dispatch progress in-order
|
||||||
sched := schedule.NewFIFOScheduler()
|
sched := schedule.NewFIFOScheduler()
|
||||||
@ -655,6 +713,8 @@ func (s *EtcdServer) run() {
|
|||||||
plog.Errorf("%s", err)
|
plog.Errorf("%s", err)
|
||||||
plog.Infof("the data-dir used by this member must be removed.")
|
plog.Infof("the data-dir used by this member must be removed.")
|
||||||
return
|
return
|
||||||
|
case <-getSyncC():
|
||||||
|
s.sync(s.Cfg.ReqTimeout())
|
||||||
case <-s.stop:
|
case <-s.stop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1130,7 +1190,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
|
|||||||
ms[i].To = 0
|
ms[i].To = 0
|
||||||
}
|
}
|
||||||
if ms[i].Type == raftpb.MsgHeartbeat {
|
if ms[i].Type == raftpb.MsgHeartbeat {
|
||||||
ok, exceed := s.r.td.Observe(ms[i].To)
|
ok, exceed := s.td.Observe(ms[i].To)
|
||||||
if !ok {
|
if !ok {
|
||||||
// TODO: limit request rate.
|
// TODO: limit request rate.
|
||||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
|
||||||
|
Reference in New Issue
Block a user