refactor(PeerServer): Use a config struct in PeerServer
This commit is contained in:
17
etcd.go
17
etcd.go
@ -103,11 +103,18 @@ func main() {
|
||||
registry := server.NewRegistry(store)
|
||||
|
||||
// Create peer server.
|
||||
heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
|
||||
electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
|
||||
ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout, &mb)
|
||||
ps.MaxClusterSize = config.MaxClusterSize
|
||||
ps.RetryTimes = config.MaxRetryAttempts
|
||||
psConfig := server.PeerServerConfig{
|
||||
Name: info.Name,
|
||||
Path: config.DataDir,
|
||||
URL: info.RaftURL,
|
||||
BindAddr: info.RaftListenHost,
|
||||
SnapshotCount: config.SnapshotCount,
|
||||
HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond,
|
||||
ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond,
|
||||
MaxClusterSize: config.MaxClusterSize,
|
||||
RetryTimes: config.MaxRetryAttempts,
|
||||
}
|
||||
ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb)
|
||||
|
||||
// Create client server.
|
||||
s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb)
|
||||
|
@ -52,7 +52,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
|
||||
}
|
||||
|
||||
// Check peer number in the cluster
|
||||
if ps.registry.Count() == ps.MaxClusterSize {
|
||||
if ps.registry.Count() == ps.Config.MaxClusterSize {
|
||||
log.Debug("Reject join request from ", c.Name)
|
||||
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
|
||||
}
|
||||
|
@ -25,15 +25,25 @@ const retryInterval = 10
|
||||
|
||||
const ThresholdMonitorTimeout = 5 * time.Second
|
||||
|
||||
type PeerServerConfig struct {
|
||||
Name string
|
||||
Path string
|
||||
URL string
|
||||
BindAddr string
|
||||
SnapshotCount int
|
||||
HeartbeatTimeout time.Duration
|
||||
ElectionTimeout time.Duration
|
||||
MaxClusterSize int
|
||||
RetryTimes int
|
||||
}
|
||||
|
||||
type PeerServer struct {
|
||||
Config PeerServerConfig
|
||||
raftServer raft.Server
|
||||
server *Server
|
||||
httpServer *http.Server
|
||||
listener net.Listener
|
||||
joinIndex uint64
|
||||
name string
|
||||
url string
|
||||
bindAddr string
|
||||
tlsConf *TLSConfig
|
||||
tlsInfo *TLSInfo
|
||||
followersStats *raftFollowersStats
|
||||
@ -41,10 +51,6 @@ type PeerServer struct {
|
||||
registry *Registry
|
||||
store store.Store
|
||||
snapConf *snapshotConf
|
||||
MaxClusterSize int
|
||||
RetryTimes int
|
||||
HeartbeatTimeout time.Duration
|
||||
ElectionTimeout time.Duration
|
||||
|
||||
closeChan chan bool
|
||||
timeoutThresholdChan chan interface{}
|
||||
@ -65,22 +71,20 @@ type snapshotConf struct {
|
||||
snapshotThr uint64
|
||||
}
|
||||
|
||||
func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration, mb *metrics.Bucket) *PeerServer {
|
||||
|
||||
func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer {
|
||||
s := &PeerServer{
|
||||
name: name,
|
||||
url: url,
|
||||
bindAddr: bindAddr,
|
||||
Config: psConfig,
|
||||
|
||||
tlsConf: tlsConf,
|
||||
tlsInfo: tlsInfo,
|
||||
registry: registry,
|
||||
store: store,
|
||||
followersStats: &raftFollowersStats{
|
||||
Leader: name,
|
||||
Leader: psConfig.Name,
|
||||
Followers: make(map[string]*raftFollowerStats),
|
||||
},
|
||||
serverStats: &raftServerStats{
|
||||
Name: name,
|
||||
Name: psConfig.Name,
|
||||
StartTime: time.Now(),
|
||||
sendRateQueue: &statsQueue{
|
||||
back: -1,
|
||||
@ -89,8 +93,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
|
||||
back: -1,
|
||||
},
|
||||
},
|
||||
HeartbeatTimeout: heartbeatTimeout,
|
||||
ElectionTimeout: electionTimeout,
|
||||
|
||||
timeoutThresholdChan: make(chan interface{}, 1),
|
||||
|
||||
@ -101,7 +103,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
|
||||
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
|
||||
|
||||
// Create raft server
|
||||
raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
|
||||
raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@ -110,7 +112,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
|
||||
checkingInterval: time.Second * 3,
|
||||
// this is not accurate, we will update raft to provide an api
|
||||
lastIndex: raftServer.CommitIndex(),
|
||||
snapshotThr: uint64(snapshotCount),
|
||||
snapshotThr: uint64(psConfig.SnapshotCount),
|
||||
}
|
||||
|
||||
s.raftServer = raftServer
|
||||
@ -134,14 +136,14 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
||||
err := s.raftServer.LoadSnapshot()
|
||||
|
||||
if err == nil {
|
||||
log.Debugf("%s finished load snapshot", s.name)
|
||||
log.Debugf("%s finished load snapshot", s.Config.Name)
|
||||
} else {
|
||||
log.Debug(err)
|
||||
}
|
||||
}
|
||||
|
||||
s.raftServer.SetElectionTimeout(s.ElectionTimeout)
|
||||
s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout)
|
||||
s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout)
|
||||
s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout)
|
||||
|
||||
s.raftServer.Start()
|
||||
|
||||
@ -155,7 +157,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
||||
|
||||
} else {
|
||||
// Rejoin the previous cluster
|
||||
cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name)
|
||||
cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
||||
for i := 0; i < len(cluster); i++ {
|
||||
u, err := url.Parse(cluster[i])
|
||||
if err != nil {
|
||||
@ -168,7 +170,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
||||
log.Warn("the entire cluster is down! this peer will restart the cluster.")
|
||||
}
|
||||
|
||||
log.Debugf("%s restart as a follower", s.name)
|
||||
log.Debugf("%s restart as a follower", s.Config.Name)
|
||||
}
|
||||
|
||||
s.closeChan = make(chan bool)
|
||||
@ -255,17 +257,17 @@ func (s *PeerServer) SetServer(server *Server) {
|
||||
func (s *PeerServer) startAsLeader() {
|
||||
// leader need to join self as a peer
|
||||
for {
|
||||
_, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.url, s.server.URL()))
|
||||
_, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL()))
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
log.Debugf("%s start as a leader", s.name)
|
||||
log.Debugf("%s start as a leader", s.Config.Name)
|
||||
}
|
||||
|
||||
func (s *PeerServer) startAsFollower(cluster []string) {
|
||||
// start as a follower in a existing cluster
|
||||
for i := 0; i < s.RetryTimes; i++ {
|
||||
for i := 0; i < s.Config.RetryTimes; i++ {
|
||||
ok := s.joinCluster(cluster)
|
||||
if ok {
|
||||
return
|
||||
@ -274,19 +276,19 @@ func (s *PeerServer) startAsFollower(cluster []string) {
|
||||
time.Sleep(time.Second * retryInterval)
|
||||
}
|
||||
|
||||
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)
|
||||
log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
||||
}
|
||||
|
||||
// Start to listen and response raft command
|
||||
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
|
||||
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.bindAddr, s.url)
|
||||
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Config.BindAddr, s.Config.URL)
|
||||
|
||||
router := mux.NewRouter()
|
||||
|
||||
s.httpServer = &http.Server{
|
||||
Handler: router,
|
||||
TLSConfig: &tlsConf,
|
||||
Addr: s.bindAddr,
|
||||
Addr: s.Config.BindAddr,
|
||||
}
|
||||
|
||||
// internal commands
|
||||
@ -333,7 +335,7 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) {
|
||||
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
|
||||
func (s *PeerServer) Upgradable() error {
|
||||
nextVersion := s.store.Version() + 1
|
||||
for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) {
|
||||
for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) {
|
||||
u, err := url.Parse(peerURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
|
||||
@ -361,7 +363,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
|
||||
|
||||
err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme)
|
||||
if err == nil {
|
||||
log.Debugf("%s success join to the cluster via peer %s", s.name, peer)
|
||||
log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer)
|
||||
return true
|
||||
|
||||
} else {
|
||||
@ -392,7 +394,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
||||
return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
|
||||
}
|
||||
|
||||
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
|
||||
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
|
||||
|
||||
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
|
||||
|
||||
@ -417,7 +419,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
address := resp.Header.Get("Location")
|
||||
log.Debugf("Send Join Request to %s", address)
|
||||
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
|
||||
json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
|
||||
resp, req, err = t.Post(address, &b)
|
||||
|
||||
} else if resp.StatusCode == http.StatusBadRequest {
|
||||
@ -477,21 +479,21 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
|
||||
|
||||
switch event.Type() {
|
||||
case raft.StateChangeEventType:
|
||||
log.Infof("%s: state changed from '%v' to '%v'.", s.name, prevValue, value)
|
||||
log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
||||
case raft.TermChangeEventType:
|
||||
log.Infof("%s: term #%v started.", s.name, value)
|
||||
log.Infof("%s: term #%v started.", s.Config.Name, value)
|
||||
case raft.LeaderChangeEventType:
|
||||
log.Infof("%s: leader changed from '%v' to '%v'.", s.name, prevValue, value)
|
||||
log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
||||
case raft.AddPeerEventType:
|
||||
log.Infof("%s: peer added: '%v'", s.name, value)
|
||||
log.Infof("%s: peer added: '%v'", s.Config.Name, value)
|
||||
case raft.RemovePeerEventType:
|
||||
log.Infof("%s: peer removed: '%v'", s.name, value)
|
||||
log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
|
||||
case raft.HeartbeatTimeoutEventType:
|
||||
var name = "<unknown>"
|
||||
if peer, ok := value.(*raft.Peer); ok {
|
||||
name = peer.Name
|
||||
}
|
||||
log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name)
|
||||
log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name)
|
||||
case raft.ElectionTimeoutThresholdEventType:
|
||||
select {
|
||||
case s.timeoutThresholdChan <- value:
|
||||
@ -538,7 +540,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
|
||||
for {
|
||||
select {
|
||||
case value := <-s.timeoutThresholdChan:
|
||||
log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value)
|
||||
log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
|
||||
case <-closeChan:
|
||||
return
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
|
||||
// Get all the current logs
|
||||
func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] GET %s/log", ps.url)
|
||||
log.Debugf("[recv] GET %s/log", ps.Config.URL)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
|
||||
@ -27,11 +27,11 @@ func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request)
|
||||
|
||||
if _, err := rvreq.Decode(req.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err)
|
||||
log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName)
|
||||
log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName)
|
||||
|
||||
resp := ps.raftServer.RequestVote(rvreq)
|
||||
|
||||
@ -55,11 +55,11 @@ func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.
|
||||
|
||||
if _, err := aereq.Decode(req.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err)
|
||||
log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries))
|
||||
log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries))
|
||||
|
||||
ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
|
||||
|
||||
@ -90,11 +90,11 @@ func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reque
|
||||
|
||||
if _, err := ssreq.Decode(req.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err)
|
||||
log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("[recv] POST %s/snapshot", ps.url)
|
||||
log.Debugf("[recv] POST %s/snapshot", ps.Config.URL)
|
||||
|
||||
resp := ps.raftServer.RequestSnapshot(ssreq)
|
||||
|
||||
@ -117,11 +117,11 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht
|
||||
|
||||
if _, err := ssrreq.Decode(req.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err)
|
||||
log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("[recv] POST %s/snapshotRecovery", ps.url)
|
||||
log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL)
|
||||
|
||||
resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
|
||||
|
||||
@ -140,7 +140,7 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht
|
||||
|
||||
// Get the port that listening for etcd connecting of the server
|
||||
func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] Get %s/etcdURL/ ", ps.url)
|
||||
log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(ps.server.URL()))
|
||||
}
|
||||
@ -195,21 +195,21 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
|
||||
|
||||
// Response to the name request
|
||||
func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] Get %s/name/ ", ps.url)
|
||||
log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(ps.name))
|
||||
w.Write([]byte(ps.Config.Name))
|
||||
}
|
||||
|
||||
// Response to the name request
|
||||
func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] Get %s/version/ ", ps.url)
|
||||
log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(strconv.Itoa(ps.store.Version())))
|
||||
}
|
||||
|
||||
// Checks whether a given version is supported.
|
||||
func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path)
|
||||
log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path)
|
||||
vars := mux.Vars(req)
|
||||
version, _ := strconv.Atoi(vars["version"])
|
||||
if version >= store.MinVersion() && version <= store.MaxVersion() {
|
||||
@ -221,7 +221,7 @@ func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.R
|
||||
|
||||
// Upgrades the current store version to the next version.
|
||||
func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Debugf("[recv] Get %s/version", ps.url)
|
||||
log.Debugf("[recv] Get %s/version", ps.Config.URL)
|
||||
|
||||
// Check if upgrade is possible for all nodes.
|
||||
if err := ps.Upgradable(); err != nil {
|
||||
|
@ -27,8 +27,8 @@ type dialer func(network, addr string) (net.Conn, error)
|
||||
// whether the user give the server cert and key
|
||||
func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
|
||||
// names for each type of timeout, for the sake of clarity
|
||||
dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
|
||||
responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
|
||||
dialTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout
|
||||
responseHeaderTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout
|
||||
|
||||
t := transporter{}
|
||||
|
||||
@ -227,7 +227,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
|
||||
// Cancel the on fly HTTP transaction when timeout happens.
|
||||
func (t *transporter) CancelWhenTimeout(req *http.Request) {
|
||||
go func() {
|
||||
time.Sleep(t.peerServer.HeartbeatTimeout)
|
||||
time.Sleep(t.peerServer.Config.HeartbeatTimeout)
|
||||
t.transport.CancelRequest(req)
|
||||
}()
|
||||
}
|
||||
|
@ -26,8 +26,17 @@ func RunServer(f func(*server.Server)) {
|
||||
store := store.New()
|
||||
registry := server.NewRegistry(store)
|
||||
|
||||
ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout, nil)
|
||||
ps.MaxClusterSize = 9
|
||||
psConfig := server.PeerServerConfig{
|
||||
Name: testName,
|
||||
Path: path,
|
||||
URL: "http://"+testRaftURL,
|
||||
BindAddr: testRaftURL,
|
||||
SnapshotCount: testSnapshotCount,
|
||||
HeartbeatTimeout: testHeartbeatTimeout,
|
||||
ElectionTimeout: testElectionTimeout,
|
||||
MaxClusterSize: 9,
|
||||
}
|
||||
ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil)
|
||||
s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil)
|
||||
ps.SetServer(s)
|
||||
|
||||
|
Reference in New Issue
Block a user