server: Extract notifier struct

This commit is contained in:
Marek Siarkowicz 2021-08-18 17:00:12 +02:00
parent 58fb625d12
commit 9d81dde082
3 changed files with 83 additions and 50 deletions

52
pkg/notify/notify.go Normal file
View File

@ -0,0 +1,52 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notify
import (
"sync"
)
// Notifier is a thread safe struct that can be used to send notification about
// some event to multiple consumers.
type Notifier struct {
mu sync.RWMutex
channel chan struct{}
}
// NewNotifier returns new notifier
func NewNotifier() *Notifier {
return &Notifier{
channel: make(chan struct{}),
}
}
// Receive returns channel that can be used to wait for notification.
// Consumers will be informed by closing the channel.
func (n *Notifier) Receive() <-chan struct{} {
n.mu.RLock()
defer n.mu.RUnlock()
return n.channel
}
// Notify closes the channel passed to consumers and creates new channel to used
// for next notification.
func (n *Notifier) Notify() {
newChannel := make(chan struct{})
n.mu.Lock()
channelToClose := n.channel
n.channel = newChannel
n.mu.Unlock()
close(channelToClose)
}

View File

@ -32,6 +32,7 @@ import (
"github.com/coreos/go-semver/semver"
humanize "github.com/dustin/go-humanize"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/pkg/v3/notify"
"go.etcd.io/etcd/server/v3/config"
"go.uber.org/zap"
@ -233,8 +234,7 @@ type EtcdServer struct {
// done is closed when all goroutines from start() complete.
done chan struct{}
// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
leaderChanged chan struct{}
leaderChangedMu sync.RWMutex
leaderChanged *notify.Notifier
errorc chan error
id types.ID
@ -288,8 +288,7 @@ type EtcdServer struct {
leadTimeMu sync.RWMutex
leadElectedTime time.Time
firstCommitInTermMu sync.RWMutex
firstCommitInTermC chan struct{}
firstCommitInTerm *notify.Notifier
*AccessController
@ -316,28 +315,27 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss),
id: b.raft.wal.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.raft.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}),
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss),
id: b.raft.wal.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.raft.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci,
firstCommitInTerm: notify.NewNotifier(),
}
serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.be
@ -555,7 +553,7 @@ func (s *EtcdServer) start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = make(chan struct{})
s.leaderChanged = notify.NewNotifier()
if s.ClusterVersion() != nil {
lg.Info(
"starting etcd server",
@ -777,11 +775,7 @@ func (s *EtcdServer) run() {
}
}
if newLeader {
s.leaderChangedMu.Lock()
lc := s.leaderChanged
s.leaderChanged = make(chan struct{})
close(lc)
s.leaderChangedMu.Unlock()
s.leaderChanged.Notify()
}
// TODO: remove the nil checking
// current test utility does not provide the stats
@ -1567,9 +1561,7 @@ func (s *EtcdServer) getLead() uint64 {
}
func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
s.leaderChangedMu.RLock()
defer s.leaderChangedMu.RUnlock()
return s.leaderChanged
return s.leaderChanged.Receive()
}
// FirstCommitInTermNotify returns channel that will be unlocked on first
@ -1577,9 +1569,7 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
// read-only requests (leader is not able to respond any read-only requests
// as long as linearizable semantic is required)
func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
s.firstCommitInTermMu.RLock()
defer s.firstCommitInTermMu.RUnlock()
return s.firstCommitInTermC
return s.firstCommitInTerm.Receive()
}
// RaftStatusGetter represents etcd server and Raft progress.
@ -1891,7 +1881,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
s.notifyAboutFirstCommitInTerm()
s.firstCommitInTerm.Notify()
// promote lessor when the local member is leader and finished
// applying all entries from the last term.
@ -1965,15 +1955,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
})
}
func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
newNotifier := make(chan struct{})
s.firstCommitInTermMu.Lock()
notifierToClose := s.firstCommitInTermC
s.firstCommitInTermC = newNotifier
s.firstCommitInTermMu.Unlock()
close(notifierToClose)
}
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
@ -2161,7 +2142,7 @@ func (s *EtcdServer) monitorVersions() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
for {
select {
case <-s.FirstCommitInTermNotify():
case <-s.firstCommitInTerm.Receive():
case <-time.After(monitorVersionInterval):
case <-s.stopping:
return

View File

@ -709,7 +709,7 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.LeaderChangedNotify()
leaderChangedNotifier := s.leaderChanged.Receive()
select {
case <-leaderChangedNotifier:
continue
@ -775,7 +775,7 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
retryTimer := time.NewTimer(readIndexRetryTime)
defer retryTimer.Stop()
firstCommitInTermNotifier := s.FirstCommitInTermNotify()
firstCommitInTermNotifier := s.firstCommitInTerm.Receive()
for {
select {
@ -803,7 +803,7 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
// return a retryable error.
return 0, ErrLeaderChanged
case <-firstCommitInTermNotifier:
firstCommitInTermNotifier = s.FirstCommitInTermNotify()
firstCommitInTermNotifier = s.firstCommitInTerm.Receive()
lg.Info("first commit in current term: resending ReadIndex request")
err := s.sendReadIndex(requestId)
if err != nil {