fix(peer_server): recover from outage with discovery
This patch also contains the refactor of find cluster process. It is changed based on @xiangli-cmu 's commits in 627 issue.
This commit is contained in:
@ -167,113 +167,91 @@ func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
|||||||
s.clusterConfig = c
|
s.clusterConfig = c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to do discovery and return results in expected format
|
// Try all possible ways to find clusters to join
|
||||||
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
|
// Include log data in -data-dir, -discovery and -peers
|
||||||
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
|
//
|
||||||
|
// Peer discovery follows this order:
|
||||||
|
// 1. previous peers in -data-dir
|
||||||
|
// 2. -discovery
|
||||||
|
// 3. -peers
|
||||||
|
//
|
||||||
|
// TODO(yichengq): RaftServer should be started as late as possible.
|
||||||
|
// Current implementation to start it is not that good,
|
||||||
|
// and should be refactored later.
|
||||||
|
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
|
||||||
|
name := s.Config.Name
|
||||||
|
isNewNode := s.raftServer.IsLogEmpty()
|
||||||
|
|
||||||
// Warn about errors coming from discovery, this isn't fatal
|
// Try its best to find possible peers, and connect with them.
|
||||||
// since the user might have provided a peer list elsewhere,
|
if !isNewNode {
|
||||||
// or there is some log in data dir.
|
// Take old nodes into account.
|
||||||
if err != nil {
|
allPeers := s.getKnownPeers()
|
||||||
log.Warnf("Discovery encountered an error: %v", err)
|
// Discover registered peers.
|
||||||
|
// TODO(yichengq): It may mess up discoverURL if this is
|
||||||
|
// set wrong by mistake. This may need to refactor discovery
|
||||||
|
// module. Fix it later.
|
||||||
|
if discoverURL != "" {
|
||||||
|
discoverPeers, _ := s.handleDiscovery(discoverURL)
|
||||||
|
allPeers = append(allPeers, discoverPeers...)
|
||||||
|
}
|
||||||
|
allPeers = append(allPeers, peers...)
|
||||||
|
allPeers = s.removeSelfFromList(allPeers)
|
||||||
|
|
||||||
|
// If there is possible peer list, use it to find cluster.
|
||||||
|
if len(allPeers) > 0 {
|
||||||
|
// TODO(yichengq): joinCluster may fail if there's no leader for
|
||||||
|
// current cluster. It should wait if the cluster is under
|
||||||
|
// leader election, or the node with changed IP cannot join
|
||||||
|
// the cluster then.
|
||||||
|
if err := s.startAsFollower(allPeers, 1); err == nil {
|
||||||
|
log.Debugf("%s joins to the previous cluster %v", name, allPeers)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warnf("%s cannot connect to previous cluster %v", name, allPeers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(yichengq): Think about the action that should be done
|
||||||
|
// if it cannot connect any of the previous known node.
|
||||||
|
s.raftServer.Start()
|
||||||
|
log.Debugf("%s is restarting the cluster %v", name, allPeers)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range peers {
|
|
||||||
// Strip the scheme off of the peer if it has one
|
|
||||||
// TODO(bp): clean this up!
|
|
||||||
purl, err := url.Parse(peers[i])
|
|
||||||
if err == nil {
|
|
||||||
peers[i] = purl.Host
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Discovery fetched back peer list: %v", peers)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try all possible ways to find clusters to join
|
|
||||||
// Include -discovery, -peers and log data in -data-dir
|
|
||||||
//
|
|
||||||
// Peer discovery follows this order:
|
|
||||||
// 1. -discovery
|
|
||||||
// 2. -peers
|
|
||||||
// 3. previous peers in -data-dir
|
|
||||||
// RaftServer should be started as late as possible. Current implementation
|
|
||||||
// to start it is not that good, and will be refactored in #627.
|
|
||||||
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
|
|
||||||
// Attempt cluster discovery
|
// Attempt cluster discovery
|
||||||
toDiscover := discoverURL != ""
|
if discoverURL != "" {
|
||||||
if toDiscover {
|
|
||||||
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
|
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
|
||||||
// It is registered in discover url
|
// It is registered in discover url
|
||||||
if discoverErr == nil {
|
if discoverErr == nil {
|
||||||
// start as a leader in a new cluster
|
// start as a leader in a new cluster
|
||||||
if len(discoverPeers) == 0 {
|
if len(discoverPeers) == 0 {
|
||||||
log.Debug("This peer is starting a brand new cluster based on discover URL.")
|
log.Debugf("%s is starting a new cluster via discover service", name)
|
||||||
s.startAsLeader()
|
s.startAsLeader()
|
||||||
} else {
|
} else {
|
||||||
s.startAsFollower(discoverPeers)
|
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
|
||||||
|
if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
|
||||||
|
|
||||||
hasPeerList := len(peers) > 0
|
if len(peers) == 0 {
|
||||||
// if there is log in data dir, append previous peers to peers in config
|
log.Fatalf("%s, the new leader, must register itself to discovery service as required", name)
|
||||||
// to find cluster
|
|
||||||
prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
||||||
for i := 0; i < len(prevPeers); i++ {
|
|
||||||
u, err := url.Parse(prevPeers[i])
|
|
||||||
if err != nil {
|
|
||||||
log.Debug("rejoin cannot parse url: ", err)
|
|
||||||
}
|
|
||||||
prevPeers[i] = u.Host
|
|
||||||
}
|
|
||||||
peers = append(peers, prevPeers...)
|
|
||||||
|
|
||||||
// Remove its own peer address from the peer list to join
|
|
||||||
u, err := url.Parse(s.Config.URL)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("cannot parse peer address %v: %v", s.Config.URL, err)
|
|
||||||
}
|
|
||||||
filteredPeers := make([]string, 0)
|
|
||||||
for _, v := range peers {
|
|
||||||
if v != u.Host {
|
|
||||||
filteredPeers = append(filteredPeers, v)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
peers = filteredPeers
|
|
||||||
|
|
||||||
// if there is backup peer lists, use it to find cluster
|
|
||||||
if len(peers) > 0 {
|
if len(peers) > 0 {
|
||||||
ok := s.joinCluster(peers)
|
if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil {
|
||||||
if !ok {
|
log.Fatalf("%s cannot connect to existing cluster %v", name, peers)
|
||||||
log.Warn("No living peers are found!")
|
|
||||||
} else {
|
|
||||||
s.raftServer.Start()
|
|
||||||
log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if !s.raftServer.IsLogEmpty() {
|
|
||||||
log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name)
|
|
||||||
s.raftServer.Start()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if toDiscover {
|
log.Infof("%s is starting a new cluster.", s.Config.Name)
|
||||||
log.Fatalf("Discovery failed, no available peers in backup list, and no log data")
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasPeerList {
|
|
||||||
log.Fatalf("No available peers in backup list, and no log data")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("This peer is starting a brand new cluster now.")
|
|
||||||
s.startAsLeader()
|
s.startAsLeader()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the raft server
|
// Start the raft server
|
||||||
@ -373,19 +351,22 @@ func (s *PeerServer) startAsLeader() {
|
|||||||
log.Debugf("%s start as a leader", s.Config.Name)
|
log.Debugf("%s start as a leader", s.Config.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeerServer) startAsFollower(cluster []string) {
|
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
|
||||||
// start as a follower in a existing cluster
|
// start as a follower in a existing cluster
|
||||||
for i := 0; i < s.Config.RetryTimes; i++ {
|
for i := 0; ; i++ {
|
||||||
ok := s.joinCluster(cluster)
|
ok := s.joinCluster(cluster)
|
||||||
if ok {
|
if ok {
|
||||||
s.raftServer.Start()
|
break
|
||||||
return
|
}
|
||||||
|
if i == retryTimes - 1 {
|
||||||
|
return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
||||||
}
|
}
|
||||||
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
|
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
|
||||||
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
|
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
s.raftServer.Start()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getVersion fetches the peer version of a cluster.
|
// getVersion fetches the peer version of a cluster.
|
||||||
@ -429,6 +410,61 @@ func (s *PeerServer) Upgradable() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper function to do discovery and return results in expected format
|
||||||
|
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
|
||||||
|
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
|
||||||
|
|
||||||
|
// Warn about errors coming from discovery, this isn't fatal
|
||||||
|
// since the user might have provided a peer list elsewhere,
|
||||||
|
// or there is some log in data dir.
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Discovery encountered an error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range peers {
|
||||||
|
// Strip the scheme off of the peer if it has one
|
||||||
|
// TODO(bp): clean this up!
|
||||||
|
purl, err := url.Parse(peers[i])
|
||||||
|
if err == nil {
|
||||||
|
peers[i] = purl.Host
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Discovery fetched back peer list: %v", peers)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// getKnownPeers gets the previous peers from log
|
||||||
|
func (s *PeerServer) getKnownPeers() []string {
|
||||||
|
peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
||||||
|
for i := range peers {
|
||||||
|
u, err := url.Parse(peers[i])
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("getPrevPeers cannot parse url %v", peers[i])
|
||||||
|
}
|
||||||
|
peers[i] = u.Host
|
||||||
|
}
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeSelfFromList removes url of the peerServer from the peer list
|
||||||
|
func (s *PeerServer) removeSelfFromList(peers []string) []string {
|
||||||
|
// Remove its own peer address from the peer list to join
|
||||||
|
u, err := url.Parse(s.Config.URL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL)
|
||||||
|
}
|
||||||
|
newPeers := make([]string, 0)
|
||||||
|
for _, v := range peers {
|
||||||
|
if v != u.Host {
|
||||||
|
newPeers = append(newPeers, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newPeers
|
||||||
|
}
|
||||||
|
|
||||||
func (s *PeerServer) joinCluster(cluster []string) bool {
|
func (s *PeerServer) joinCluster(cluster []string) bool {
|
||||||
for _, peer := range cluster {
|
for _, peer := range cluster {
|
||||||
if len(peer) == 0 {
|
if len(peer) == 0 {
|
||||||
|
@ -292,6 +292,53 @@ func TestDiscoverySecondPeerUp(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDiscoveryRestart ensures that a discovery cluster could be restarted.
|
||||||
|
func TestDiscoveryRestart(t *testing.T) {
|
||||||
|
etcdtest.RunServer(func(s *server.Server) {
|
||||||
|
proc, err := startServer([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
client := http.Client{}
|
||||||
|
err = assertServerFunctional(client, "http")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
proc2, err := startServer2([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = assertServerFunctional(client, "http")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
stopServer(proc)
|
||||||
|
stopServer(proc2)
|
||||||
|
|
||||||
|
proc, err = startServerWithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
proc2, err = startServer2WithDataDir([]string{"-discovery", s.URL() + "/v2/keys/_etcd/registry/4", "-addr", "127.0.0.1:4002", "-peer-addr", "127.0.0.1:7002"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = assertServerFunctional(client, "http")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
stopServer(proc)
|
||||||
|
stopServer(proc2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
func assertServerNotUp(client http.Client, scheme string) error {
|
func assertServerNotUp(client http.Client, scheme string) error {
|
||||||
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
|
path := fmt.Sprintf("%s://127.0.0.1:4001/v2/keys/foo", scheme)
|
||||||
fields := url.Values(map[string][]string{"value": {"bar"}})
|
fields := url.Values(map[string][]string{"value": {"bar"}})
|
||||||
|
@ -166,6 +166,19 @@ func startServer(extra []string) (*os.Process, error) {
|
|||||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(yichengq): refactor these helper functions in #645
|
||||||
|
func startServer2(extra []string) (*os.Process, error) {
|
||||||
|
procAttr := new(os.ProcAttr)
|
||||||
|
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||||
|
|
||||||
|
cmd := []string{"etcd", "-f", "-data-dir=/tmp/node2", "-name=node2"}
|
||||||
|
cmd = append(cmd, extra...)
|
||||||
|
|
||||||
|
fmt.Println(strings.Join(cmd, " "))
|
||||||
|
|
||||||
|
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||||
|
}
|
||||||
|
|
||||||
func startServerWithDataDir(extra []string) (*os.Process, error) {
|
func startServerWithDataDir(extra []string) (*os.Process, error) {
|
||||||
procAttr := new(os.ProcAttr)
|
procAttr := new(os.ProcAttr)
|
||||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||||
@ -173,6 +186,18 @@ func startServerWithDataDir(extra []string) (*os.Process, error) {
|
|||||||
cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
|
cmd := []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
|
||||||
cmd = append(cmd, extra...)
|
cmd = append(cmd, extra...)
|
||||||
|
|
||||||
|
fmt.Println(strings.Join(cmd, " "))
|
||||||
|
|
||||||
|
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func startServer2WithDataDir(extra []string) (*os.Process, error) {
|
||||||
|
procAttr := new(os.ProcAttr)
|
||||||
|
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||||
|
|
||||||
|
cmd := []string{"etcd", "-data-dir=/tmp/node2", "-name=node2"}
|
||||||
|
cmd = append(cmd, extra...)
|
||||||
|
|
||||||
println(strings.Join(cmd, " "))
|
println(strings.Join(cmd, " "))
|
||||||
|
|
||||||
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
return os.StartProcess(EtcdBinPath, cmd, procAttr)
|
||||||
|
@ -95,7 +95,7 @@ func TestV1ClusterMigration(t *testing.T) {
|
|||||||
body := tests.ReadBody(resp)
|
body := tests.ReadBody(resp)
|
||||||
assert.Nil(t, err, "")
|
assert.Nil(t, err, "")
|
||||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||||
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
|
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":10}`+"\n")
|
||||||
|
|
||||||
// Ensure TTL'd message is removed.
|
// Ensure TTL'd message is removed.
|
||||||
resp, err = tests.Get("http://localhost:4001/v2/keys/foo")
|
resp, err = tests.Get("http://localhost:4001/v2/keys/foo")
|
||||||
|
Reference in New Issue
Block a user