Renamed configuration parameters.
This commit is contained in:
@ -28,7 +28,7 @@ type PeerServer struct {
|
||||
joinIndex uint64
|
||||
name string
|
||||
url string
|
||||
listenHost string
|
||||
bindAddr string
|
||||
tlsConf *TLSConfig
|
||||
tlsInfo *TLSInfo
|
||||
followersStats *raftFollowersStats
|
||||
@ -53,16 +53,16 @@ type snapshotConf struct {
|
||||
writesThr uint64
|
||||
}
|
||||
|
||||
func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapCount int) *PeerServer {
|
||||
func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
|
||||
s := &PeerServer{
|
||||
name: name,
|
||||
url: url,
|
||||
listenHost: listenHost,
|
||||
tlsConf: tlsConf,
|
||||
tlsInfo: tlsInfo,
|
||||
registry: registry,
|
||||
store: store,
|
||||
snapConf: &snapshotConf{time.Second * 3, 0, uint64(snapCount)},
|
||||
name: name,
|
||||
url: url,
|
||||
bindAddr: bindAddr,
|
||||
tlsConf: tlsConf,
|
||||
tlsInfo: tlsInfo,
|
||||
registry: registry,
|
||||
store: store,
|
||||
snapConf: &snapshotConf{time.Second * 3, 0, uint64(snapshotCount)},
|
||||
followersStats: &raftFollowersStats{
|
||||
Leader: name,
|
||||
Followers: make(map[string]*raftFollowerStats),
|
||||
@ -130,7 +130,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
||||
}
|
||||
ok := s.joinCluster(cluster)
|
||||
if !ok {
|
||||
log.Warn("the entire cluster is down! this machine will restart the cluster.")
|
||||
log.Warn("the entire cluster is down! this peer will restart the cluster.")
|
||||
}
|
||||
|
||||
log.Debugf("%s restart as a follower", s.name)
|
||||
@ -228,23 +228,23 @@ func (s *PeerServer) startAsFollower(cluster []string) {
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
|
||||
log.Warnf("cannot join to cluster via given peers, retry in %d seconds", RetryInterval)
|
||||
time.Sleep(time.Second * RetryInterval)
|
||||
}
|
||||
|
||||
log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
|
||||
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)
|
||||
}
|
||||
|
||||
// Start to listen and response raft command
|
||||
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
|
||||
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
|
||||
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.bindAddr, s.url)
|
||||
|
||||
router := mux.NewRouter()
|
||||
|
||||
s.httpServer = &http.Server{
|
||||
Handler: router,
|
||||
TLSConfig: &tlsConf,
|
||||
Addr: s.listenHost,
|
||||
Addr: s.bindAddr,
|
||||
}
|
||||
|
||||
// internal commands
|
||||
@ -312,14 +312,14 @@ func (s *PeerServer) Upgradable() error {
|
||||
}
|
||||
|
||||
func (s *PeerServer) joinCluster(cluster []string) bool {
|
||||
for _, machine := range cluster {
|
||||
if len(machine) == 0 {
|
||||
for _, peer := range cluster {
|
||||
if len(peer) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
err := s.joinByMachine(s.raftServer, machine, s.tlsConf.Scheme)
|
||||
err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme)
|
||||
if err == nil {
|
||||
log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
|
||||
log.Debugf("%s success join to the cluster via peer %s", s.name, peer)
|
||||
return true
|
||||
|
||||
} else {
|
||||
@ -327,21 +327,21 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
log.Debugf("cannot join to cluster via machine %s %s", machine, err)
|
||||
log.Debugf("cannot join to cluster via peer %s %s", peer, err)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Send join requests to machine.
|
||||
func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme string) error {
|
||||
// Send join requests to peer.
|
||||
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
|
||||
var b bytes.Buffer
|
||||
|
||||
// t must be ok
|
||||
t, _ := server.Transporter().(*transporter)
|
||||
|
||||
// Our version must match the leaders version
|
||||
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
|
||||
versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"}
|
||||
version, err := getVersion(t, versionURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error during join version check: %v", err)
|
||||
@ -352,7 +352,7 @@ func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme st
|
||||
|
||||
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
|
||||
|
||||
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
|
||||
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
|
||||
|
||||
log.Debugf("Send Join Request to %s", joinURL.String())
|
||||
|
||||
@ -379,7 +379,7 @@ func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme st
|
||||
resp, req, err = t.Post(address, &b)
|
||||
|
||||
} else if resp.StatusCode == http.StatusBadRequest {
|
||||
log.Debug("Reach max number machines in the cluster")
|
||||
log.Debug("Reach max number peers in the cluster")
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
err := &etcdErr.Error{}
|
||||
decoder.Decode(err)
|
||||
|
Reference in New Issue
Block a user