etcdserver: replace forceVersionC with FirstCommitInTermNotify

This commit is contained in:
wpedrak
2021-03-29 12:31:44 +02:00
parent 3d485faac5
commit 3991a8c9fa

View File

@ -273,10 +273,6 @@ type EtcdServer struct {
peerRt http.RoundTripper peerRt http.RoundTripper
reqIDGen *idutil.Generator reqIDGen *idutil.Generator
// forceVersionC is used to force the version monitor loop
// to detect the cluster version immediately.
forceVersionC chan struct{}
// wgMu blocks concurrent waitgroup mutation while server stopping // wgMu blocks concurrent waitgroup mutation while server stopping
wgMu sync.RWMutex wgMu sync.RWMutex
// wg is used to wait for the goroutines that depends on the server state // wg is used to wait for the goroutines that depends on the server state
@ -528,7 +524,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt, peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()), consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
firstCommitInTermC: make(chan struct{}), firstCommitInTermC: make(chan struct{}),
@ -2084,10 +2079,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if len(e.Data) == 0 { if len(e.Data) == 0 {
s.notifyAboutFirstCommitInTerm() s.notifyAboutFirstCommitInTerm()
select {
case s.forceVersionC <- struct{}{}:
default:
}
// promote lessor when the local member is leader and finished // promote lessor when the local member is leader and finished
// applying all entries from the last term. // applying all entries from the last term.
if s.isLeader() { if s.isLeader() {
@ -2344,7 +2335,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
func (s *EtcdServer) monitorVersions() { func (s *EtcdServer) monitorVersions() {
for { for {
select { select {
case <-s.forceVersionC: case <-s.FirstCommitInTermNotify():
case <-time.After(monitorVersionInterval): case <-time.After(monitorVersionInterval):
case <-s.stopping: case <-s.stopping:
return return