server: Refactor compaction checker

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz
2022-07-25 13:49:45 +02:00
parent 264498258b
commit c58ec9fe13
4 changed files with 22 additions and 12 deletions

View File

@ -252,7 +252,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// newly started member ("memberInitialized==false") // newly started member ("memberInitialized==false")
// does not need corruption check // does not need corruption check
if memberInitialized && srvcfg.InitialCorruptCheck { if memberInitialized && srvcfg.InitialCorruptCheck {
if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started) // (nothing to close since rafthttp transports have not been started)

View File

@ -32,7 +32,12 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type corruptionMonitor struct { type CorruptionChecker interface {
InitialCheck() error
PeriodicCheck() error
}
type corruptionChecker struct {
lg *zap.Logger lg *zap.Logger
hasher Hasher hasher Hasher
@ -47,8 +52,8 @@ type Hasher interface {
TriggerCorruptAlarm(uint64) TriggerCorruptAlarm(uint64)
} }
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { func NewCorruptionChecker(lg *zap.Logger, s *EtcdServer) *corruptionChecker {
return &corruptionMonitor{ return &corruptionChecker{
lg: lg, lg: lg,
hasher: hasherAdapter{s, s.KV().HashStorage()}, hasher: hasherAdapter{s, s.KV().HashStorage()},
} }
@ -74,7 +79,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
// InitialCheck compares initial hash values with its peers // InitialCheck compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes // before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision. // are different at requested revision, with same compact revision.
func (cm *corruptionMonitor) InitialCheck() error { func (cm *corruptionChecker) InitialCheck() error {
cm.lg.Info( cm.lg.Info(
"starting initial corruption check", "starting initial corruption check",
@ -153,7 +158,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
return nil return nil
} }
func (cm *corruptionMonitor) periodicCheck() error { func (cm *corruptionChecker) PeriodicCheck() error {
h, rev, err := cm.hasher.HashByRev(0) h, rev, err := cm.hasher.HashByRev(0)
if err != nil { if err != nil {
return err return err

View File

@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) {
} }
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{ monitor := corruptionChecker{
lg: zaptest.NewLogger(t), lg: zaptest.NewLogger(t),
hasher: &tc.hasher, hasher: &tc.hasher,
} }
@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) {
} }
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{ monitor := corruptionChecker{
lg: zaptest.NewLogger(t), lg: zaptest.NewLogger(t),
hasher: &tc.hasher, hasher: &tc.hasher,
} }
err := monitor.periodicCheck() err := monitor.PeriodicCheck()
if gotError := err != nil; gotError != tc.expectError { if gotError := err != nil; gotError != tc.expectError {
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
} }

View File

@ -296,6 +296,7 @@ type EtcdServer struct {
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool forceSnapshot bool
corruptionChecker CorruptionChecker
} }
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
@ -371,6 +372,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionSleepInterval: cfg.CompactionSleepInterval, CompactionSleepInterval: cfg.CompactionSleepInterval,
} }
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = NewCorruptionChecker(cfg.Logger, srv)
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost)) srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
@ -2199,7 +2201,6 @@ func (s *EtcdServer) monitorKVHash() {
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.Duration("interval", t), zap.Duration("interval", t),
) )
monitor := NewCorruptionMonitor(lg, s)
for { for {
select { select {
case <-s.stopping: case <-s.stopping:
@ -2209,7 +2210,7 @@ func (s *EtcdServer) monitorKVHash() {
if !s.isLeader() { if !s.isLeader() {
continue continue
} }
if err := monitor.periodicCheck(); err != nil { if err := s.corruptionChecker.PeriodicCheck(); err != nil {
lg.Warn("failed to check hash KV", zap.Error(err)) lg.Warn("failed to check hash KV", zap.Error(err))
} }
} }
@ -2416,3 +2417,7 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
} }
} }
} }
func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
return s.corruptionChecker
}