etcdserver: move 'EtcdServer.send' to raft.go
Clear 'TODO'
This commit is contained in:
@ -24,6 +24,7 @@ import (
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/contention"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
@ -97,8 +98,14 @@ type raftNode struct {
|
||||
// last lead elected time
|
||||
lt time.Time
|
||||
|
||||
// to check if msg receiver is removed from cluster
|
||||
isIDRemoved func(id uint64) bool
|
||||
|
||||
raft.Node
|
||||
|
||||
// a chan to send/receive snapshot
|
||||
msgSnapC chan raftpb.Message
|
||||
|
||||
// a chan to send out apply
|
||||
applyc chan apply
|
||||
|
||||
@ -106,7 +113,10 @@ type raftNode struct {
|
||||
readStateC chan raft.ReadState
|
||||
|
||||
// utility
|
||||
ticker <-chan time.Time
|
||||
ticker <-chan time.Time
|
||||
// contention detectors for raft heartbeat message
|
||||
td *contention.TimeoutDetector
|
||||
heartbeat time.Duration // for logging
|
||||
raftStorage *raft.MemoryStorage
|
||||
storage Storage
|
||||
// transport specifies the transport to send and receive msgs to members.
|
||||
@ -180,7 +190,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
// For more details, check raft thesis 10.2.1
|
||||
if islead {
|
||||
// gofail: var raftBeforeLeaderSend struct{}
|
||||
rh.sendMessage(rd.Messages)
|
||||
r.sendMessages(rd.Messages)
|
||||
}
|
||||
|
||||
// gofail: var raftBeforeSave struct{}
|
||||
@ -207,7 +217,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
|
||||
if !islead {
|
||||
// gofail: var raftBeforeFollowerSend struct{}
|
||||
rh.sendMessage(rd.Messages)
|
||||
r.sendMessages(rd.Messages)
|
||||
}
|
||||
raftDone <- struct{}{}
|
||||
r.Advance()
|
||||
@ -218,6 +228,46 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *raftNode) sendMessages(ms []raftpb.Message) {
|
||||
sentAppResp := false
|
||||
for i := len(ms) - 1; i >= 0; i-- {
|
||||
if r.isIDRemoved(ms[i].To) {
|
||||
ms[i].To = 0
|
||||
}
|
||||
|
||||
if ms[i].Type == raftpb.MsgAppResp {
|
||||
if sentAppResp {
|
||||
ms[i].To = 0
|
||||
} else {
|
||||
sentAppResp = true
|
||||
}
|
||||
}
|
||||
|
||||
if ms[i].Type == raftpb.MsgSnap {
|
||||
// There are two separate data store: the store for v2, and the KV for v3.
|
||||
// The msgSnap only contains the most recent snapshot of store without KV.
|
||||
// So we need to redirect the msgSnap to etcd server main loop for merging in the
|
||||
// current store snapshot and KV snapshot.
|
||||
select {
|
||||
case r.msgSnapC <- ms[i]:
|
||||
default:
|
||||
// drop msgSnap if the inflight chan if full.
|
||||
}
|
||||
ms[i].To = 0
|
||||
}
|
||||
if ms[i].Type == raftpb.MsgHeartbeat {
|
||||
ok, exceed := r.td.Observe(ms[i].To)
|
||||
if !ok {
|
||||
// TODO: limit request rate.
|
||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||
plog.Warningf("server is likely overloaded")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.transport.Send(ms)
|
||||
}
|
||||
|
||||
func (r *raftNode) apply() chan apply {
|
||||
return r.applyc
|
||||
}
|
||||
|
Reference in New Issue
Block a user