Add auto-demotion after peer inactivity.

This commit is contained in:
Ben Johnson
2014-02-28 09:17:02 -07:00
parent fddbf35df2
commit c8d6b26dfd
12 changed files with 245 additions and 136 deletions

View File

@ -46,4 +46,3 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
func (c *DemoteCommand) NodeName() string { func (c *DemoteCommand) NodeName() string {
return c.Name return c.Name
} }

View File

@ -13,6 +13,15 @@ func init() {
} }
// The JoinCommand adds a node to the cluster. // The JoinCommand adds a node to the cluster.
//
// The command returns two values back to binary format.
// The first value is a Uvarint representing the the join_index.
// The second value is a single byte flag representing whether the joining
// node is a peer (0) or a proxy (1).
//
// 8 bytes | 1 byte
// join_index | join_mode
//
type JoinCommand struct { type JoinCommand struct {
MinVersion int `json:"minVersion"` MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"` MaxVersion int `json:"maxVersion"`

View File

@ -1,8 +1,8 @@
package server package server
import ( import (
"bytes"
"bufio" "bufio"
"bytes"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -27,6 +27,7 @@ import (
const ThresholdMonitorTimeout = 5 * time.Second const ThresholdMonitorTimeout = 5 * time.Second
const ActiveMonitorTimeout = 1 * time.Second const ActiveMonitorTimeout = 1 * time.Second
const PeerActivityMonitorTimeout = 1 * time.Second
type PeerServerConfig struct { type PeerServerConfig struct {
Name string Name string
@ -267,6 +268,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
go s.monitorSync() go s.monitorSync()
go s.monitorTimeoutThreshold(s.closeChan) go s.monitorTimeoutThreshold(s.closeChan)
go s.monitorActive(s.closeChan) go s.monitorActive(s.closeChan)
go s.monitorPeerActivity(s.closeChan)
// open the snapshot // open the snapshot
if snapshot { if snapshot {
@ -617,7 +619,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
func (s *PeerServer) monitorActive(closeChan chan bool) { func (s *PeerServer) monitorActive(closeChan chan bool) {
for { for {
select { select {
case <- time.After(ActiveMonitorTimeout): case <-time.After(ActiveMonitorTimeout):
case <-closeChan: case <-closeChan:
return return
} }
@ -632,18 +634,13 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
peerCount := s.registry.PeerCount() peerCount := s.registry.PeerCount()
proxies := s.registry.Proxies() proxies := s.registry.Proxies()
peers := s.registry.Peers() peers := s.registry.Peers()
fmt.Println("active.3»", peers)
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...) peers = append(peers[:index], peers[index+1:]...)
} }
fmt.Println("active.1»", activeSize, peerCount)
fmt.Println("active.2»", proxies)
// If we have more active nodes than we should then demote. // If we have more active nodes than we should then demote.
if peerCount > activeSize { if peerCount > activeSize {
peer := peers[rand.Intn(len(peers))] peer := peers[rand.Intn(len(peers))]
fmt.Println("active.demote»", peer)
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil { if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil {
log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
} }
@ -652,9 +649,11 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
// If we don't have enough active nodes then try to promote a proxy. // If we don't have enough active nodes then try to promote a proxy.
if peerCount < activeSize && len(proxies) > 0 { if peerCount < activeSize && len(proxies) > 0 {
proxy := proxies[rand.Intn(len(proxies))] loop:
for _, i := range rand.Perm(len(proxies)) {
proxy := proxies[i]
proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy) proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy)
log.Infof("%s: promoting: %v (%s)", s.Config.Name, proxy, proxyPeerURL) log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, proxy, proxyPeerURL)
// Notify proxy to promote itself. // Notify proxy to promote itself.
client := &http.Client{ client := &http.Client{
@ -666,14 +665,48 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil) resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil)
if err != nil { if err != nil {
log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
continue
} else if resp.StatusCode != http.StatusOK { } else if resp.StatusCode != http.StatusOK {
log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode)
}
continue continue
} }
break loop
}
}
} }
} }
// monitorPeerActivity periodically checks for dead nodes and demotes them.
func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
for {
select {
case <-time.After(PeerActivityMonitorTimeout):
case <-closeChan:
return
}
// Ignore while this peer is not a leader.
if s.raftServer.State() != raft.Leader {
continue
}
// Check last activity for all peers.
now := time.Now()
promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second
peers := s.raftServer.Peers()
for _, peer := range peers {
// If the last response from the peer is longer than the promote delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
}
continue
}
}
}
}
// Mode represents whether the server is an active peer or if the server is // Mode represents whether the server is an active peer or if the server is
// simply acting as a proxy. // simply acting as a proxy.
@ -686,4 +719,3 @@ const (
// ProxyMode is when the server is an inactive, request-forwarding node. // ProxyMode is when the server is an inactive, request-forwarding node.
ProxyMode = Mode("proxy") ProxyMode = Mode("proxy")
) )

View File

@ -11,8 +11,8 @@ import (
"github.com/coreos/etcd/third_party/github.com/gorilla/mux" "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
) )
@ -215,7 +215,7 @@ func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *ht
// Updates the cluster configuration. // Updates the cluster configuration.
func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
c := &SetClusterConfigCommand{Config:&ClusterConfig{}} c := &SetClusterConfigCommand{Config: &ClusterConfig{}}
if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil { if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@ -43,7 +43,7 @@ func NewRegistry(s store.Store) *Registry {
} }
} }
// Peers returns a list of peer names. // Peers returns a list of cached peer names.
func (r *Registry) Peers() []string { func (r *Registry) Peers() []string {
names := make([]string, 0, len(r.peers)) names := make([]string, 0, len(r.peers))
for name, _ := range r.peers { for name, _ := range r.peers {
@ -53,7 +53,7 @@ func (r *Registry) Peers() []string {
return names return names
} }
// Proxies returns a list of proxy names. // Proxies returns a list of cached proxy names.
func (r *Registry) Proxies() []string { func (r *Registry) Proxies() []string {
names := make([]string, 0, len(r.proxies)) names := make([]string, 0, len(r.proxies))
for name, _ := range r.proxies { for name, _ := range r.proxies {
@ -63,7 +63,6 @@ func (r *Registry) Proxies() []string {
return names return names
} }
// RegisterPeer adds a peer to the registry. // RegisterPeer adds a peer to the registry.
func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error { func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error {
// TODO(benbjohnson): Disallow peers that are already proxies. // TODO(benbjohnson): Disallow peers that are already proxies.
@ -150,7 +149,6 @@ func (r *Registry) exists(key, name string) bool {
return (e.Node != nil) return (e.Node != nil)
} }
// Retrieves the client URL for a given node by name. // Retrieves the client URL for a given node by name.
func (r *Registry) ClientURL(name string) (string, bool) { func (r *Registry) ClientURL(name string) (string, bool) {
r.Lock() r.Lock()
@ -188,7 +186,7 @@ func (r *Registry) PeerHost(name string) (string, bool) {
func (r *Registry) PeerURL(name string) (string, bool) { func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
return r.peerURL(RegistryPeerKey,name) return r.peerURL(RegistryPeerKey, name)
} }
func (r *Registry) peerURL(key, name string) (string, bool) { func (r *Registry) peerURL(key, name string) (string, bool) {

View File

@ -12,10 +12,10 @@ import (
"github.com/coreos/etcd/third_party/github.com/gorilla/mux" "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
ehttp "github.com/coreos/etcd/http"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics" "github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/mod" "github.com/coreos/etcd/mod"
ehttp "github.com/coreos/etcd/http"
uhttp "github.com/coreos/etcd/pkg/http" uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/server/v1" "github.com/coreos/etcd/server/v1"
"github.com/coreos/etcd/server/v2" "github.com/coreos/etcd/server/v2"

View File

@ -82,3 +82,63 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 2) assert.Equal(t, len(result.Node.Nodes), 2)
} }
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
func TestProxyAutoPromote(t *testing.T) {
clusterSize := 10 // DefaultActiveSize + 1
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer func() {
// Wrap this in a closure so that it picks up the updated version of
// the "etcds" variable.
DestroyCluster(etcds)
}()
c := etcd.NewClient(nil)
c.SyncCluster()
time.Sleep(1 * time.Second)
// Verify that we have one proxy.
result, err := c.Get("_etcd/proxies", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 1)
// Reconfigure with a short promote delay (1 second).
resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
// Remove peer.
etcd := etcds[1]
etcds = append(etcds[:1], etcds[2:]...)
if err := etcd.Kill(); err != nil {
panic(err.Error())
}
etcd.Release()
// Wait for it to get dropped.
time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second))
// Wait for the proxy to be promoted.
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
// Verify that we have 9 peers.
result, err = c.Get("_etcd/machines", true, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 9)
// Verify that node10 is one of those peers.
result, err = c.Get("_etcd/machines/node10", false, false)
assert.NoError(t, err)
// Verify that there are no more proxies.
result, err = c.Get("_etcd/proxies", false, true)
assert.NoError(t, err)
if assert.Equal(t, len(result.Node.Nodes), 1) {
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/proxies/node2")
}
}

View File

@ -20,6 +20,7 @@ type Peer struct {
mutex sync.RWMutex mutex sync.RWMutex
stopChan chan bool stopChan chan bool
heartbeatInterval time.Duration heartbeatInterval time.Duration
lastActivity time.Time
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -67,6 +68,11 @@ func (p *Peer) setPrevLogIndex(value uint64) {
p.prevLogIndex = value p.prevLogIndex = value
} }
// LastActivity returns the last time any response was received from the peer.
func (p *Peer) LastActivity() time.Time {
return p.lastActivity
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// //
// Methods // Methods
@ -103,6 +109,7 @@ func (p *Peer) clone() *Peer {
Name: p.Name, Name: p.Name,
ConnectionString: p.ConnectionString, ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex, prevLogIndex: p.prevLogIndex,
lastActivity: p.lastActivity,
} }
} }
@ -176,6 +183,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
// If successful then update the previous log index. // If successful then update the previous log index.
p.mutex.Lock() p.mutex.Lock()
p.lastActivity = time.Now()
if resp.Success() { if resp.Success() {
if len(req.Entries) > 0 { if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex() p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex()
@ -243,6 +251,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// If successful, the peer should have been to snapshot state // If successful, the peer should have been to snapshot state
// Send it the snapshot! // Send it the snapshot!
p.lastActivity = time.Now()
if resp.Success { if resp.Success {
p.sendSnapshotRecoveryRequest() p.sendSnapshotRecoveryRequest()
} else { } else {
@ -263,6 +272,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
return return
} }
p.lastActivity = time.Now()
if resp.Success { if resp.Success {
p.prevLogIndex = req.LastIndex p.prevLogIndex = req.LastIndex
} else { } else {
@ -283,6 +293,7 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo
req.peer = p req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name) debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name)
p.lastActivity = time.Now()
resp.peer = p resp.peer = p
c <- resp c <- resp
} else { } else {