|
|
|
@ -32,33 +32,74 @@ import (
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// CheckInitialHashKV compares initial hash values with its peers
|
|
|
|
|
type corruptionMonitor struct {
|
|
|
|
|
lg *zap.Logger
|
|
|
|
|
|
|
|
|
|
hasher Hasher
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Hasher interface {
|
|
|
|
|
mvcc.Hasher
|
|
|
|
|
ReqTimeout() time.Duration
|
|
|
|
|
MemberId() types.ID
|
|
|
|
|
PeerHashByRev(int64) []*peerHashKVResp
|
|
|
|
|
LinearizableReadNotify(context.Context) error
|
|
|
|
|
TriggerCorruptAlarm(uint64)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
|
|
|
|
|
return &corruptionMonitor{
|
|
|
|
|
lg: lg,
|
|
|
|
|
hasher: hasherAdapter{s},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type hasherAdapter struct {
|
|
|
|
|
*EtcdServer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h hasherAdapter) Hash() (hash uint32, revision int64, err error) {
|
|
|
|
|
return h.EtcdServer.KV().Hash()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h hasherAdapter) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) {
|
|
|
|
|
return h.EtcdServer.KV().HashByRev(rev)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h hasherAdapter) ReqTimeout() time.Duration {
|
|
|
|
|
return h.EtcdServer.Cfg.ReqTimeout()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h hasherAdapter) PeerHashByRev(rev int64) []*peerHashKVResp {
|
|
|
|
|
return h.EtcdServer.getPeerHashKVs(rev)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
|
|
|
|
|
h.EtcdServer.triggerCorruptAlarm(memberID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// InitialCheck compares initial hash values with its peers
|
|
|
|
|
// before serving any peer/client traffic. Only mismatch when hashes
|
|
|
|
|
// are different at requested revision, with same compact revision.
|
|
|
|
|
func (s *EtcdServer) CheckInitialHashKV() error {
|
|
|
|
|
if !s.Cfg.InitialCorruptCheck {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
func (cm *corruptionMonitor) InitialCheck() error {
|
|
|
|
|
|
|
|
|
|
lg := s.Logger()
|
|
|
|
|
|
|
|
|
|
lg.Info(
|
|
|
|
|
cm.lg.Info(
|
|
|
|
|
"starting initial corruption check",
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.Duration("timeout", s.Cfg.ReqTimeout()),
|
|
|
|
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
|
|
|
|
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
h, rev, crev, err := s.kv.HashByRev(0)
|
|
|
|
|
h, rev, crev, err := cm.hasher.HashByRev(0)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("%s failed to fetch hash (%v)", s.MemberId(), err)
|
|
|
|
|
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err)
|
|
|
|
|
}
|
|
|
|
|
peers := s.getPeerHashKVs(rev)
|
|
|
|
|
peers := cm.hasher.PeerHashByRev(rev)
|
|
|
|
|
mismatch := 0
|
|
|
|
|
for _, p := range peers {
|
|
|
|
|
if p.resp != nil {
|
|
|
|
|
peerID := types.ID(p.resp.Header.MemberId)
|
|
|
|
|
fields := []zap.Field{
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
|
|
|
|
zap.Int64("local-member-revision", rev),
|
|
|
|
|
zap.Int64("local-member-compact-revision", crev),
|
|
|
|
|
zap.Uint32("local-member-hash", h),
|
|
|
|
@ -71,10 +112,10 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
|
|
|
|
|
|
|
|
|
if h != p.resp.Hash {
|
|
|
|
|
if crev == p.resp.CompactRevision {
|
|
|
|
|
lg.Warn("found different hash values from remote peer", fields...)
|
|
|
|
|
cm.lg.Warn("found different hash values from remote peer", fields...)
|
|
|
|
|
mismatch++
|
|
|
|
|
} else {
|
|
|
|
|
lg.Warn("found different compact revision values from remote peer", fields...)
|
|
|
|
|
cm.lg.Warn("found different compact revision values from remote peer", fields...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -84,9 +125,9 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
|
|
|
|
if p.err != nil {
|
|
|
|
|
switch p.err {
|
|
|
|
|
case rpctypes.ErrFutureRev:
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"cannot fetch hash from slow remote peer",
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
|
|
|
|
zap.Int64("local-member-revision", rev),
|
|
|
|
|
zap.Int64("local-member-compact-revision", crev),
|
|
|
|
|
zap.Uint32("local-member-hash", h),
|
|
|
|
@ -95,9 +136,9 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
|
|
|
|
zap.Error(err),
|
|
|
|
|
)
|
|
|
|
|
case rpctypes.ErrCompacted:
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"cannot fetch hash from remote peer; local member is behind",
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
|
|
|
|
zap.Int64("local-member-revision", rev),
|
|
|
|
|
zap.Int64("local-member-compact-revision", crev),
|
|
|
|
|
zap.Uint32("local-member-hash", h),
|
|
|
|
@ -109,61 +150,31 @@ func (s *EtcdServer) CheckInitialHashKV() error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if mismatch > 0 {
|
|
|
|
|
return fmt.Errorf("%s found data inconsistency with peers", s.MemberId())
|
|
|
|
|
return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lg.Info(
|
|
|
|
|
cm.lg.Info(
|
|
|
|
|
"initial corruption checking passed; no corruption",
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *EtcdServer) monitorKVHash() {
|
|
|
|
|
t := s.Cfg.CorruptCheckTime
|
|
|
|
|
if t == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lg := s.Logger()
|
|
|
|
|
lg.Info(
|
|
|
|
|
"enabled corruption checking",
|
|
|
|
|
zap.String("local-member-id", s.MemberId().String()),
|
|
|
|
|
zap.Duration("interval", t),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.stopping:
|
|
|
|
|
return
|
|
|
|
|
case <-time.After(t):
|
|
|
|
|
}
|
|
|
|
|
if !s.isLeader() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if err := s.checkHashKV(); err != nil {
|
|
|
|
|
lg.Warn("failed to check hash KV", zap.Error(err))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
lg := s.Logger()
|
|
|
|
|
|
|
|
|
|
h, rev, crev, err := s.kv.HashByRev(0)
|
|
|
|
|
func (cm *corruptionMonitor) periodicCheck() error {
|
|
|
|
|
h, rev, crev, err := cm.hasher.HashByRev(0)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
peers := s.getPeerHashKVs(rev)
|
|
|
|
|
peers := cm.hasher.PeerHashByRev(rev)
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
|
|
|
|
err = s.linearizableReadNotify(ctx)
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), cm.hasher.ReqTimeout())
|
|
|
|
|
err = cm.hasher.LinearizableReadNotify(ctx)
|
|
|
|
|
cancel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h2, rev2, crev2, err := s.kv.HashByRev(0)
|
|
|
|
|
h2, rev2, crev2, err := cm.hasher.HashByRev(0)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -174,11 +185,11 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
alarmed = true
|
|
|
|
|
s.triggerCorruptAlarm(id)
|
|
|
|
|
cm.hasher.TriggerCorruptAlarm(id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if h2 != h && rev2 == rev && crev == crev2 {
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"found hash mismatch",
|
|
|
|
|
zap.Int64("revision-1", rev),
|
|
|
|
|
zap.Int64("compact-revision-1", crev),
|
|
|
|
@ -187,7 +198,7 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
zap.Int64("compact-revision-2", crev2),
|
|
|
|
|
zap.Uint32("hash-2", h2),
|
|
|
|
|
)
|
|
|
|
|
mismatch(uint64(s.MemberId()))
|
|
|
|
|
mismatch(uint64(cm.hasher.MemberId()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
checkedCount := 0
|
|
|
|
@ -200,7 +211,7 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
|
|
|
|
|
// leader expects follower's latest revision less than or equal to leader's
|
|
|
|
|
if p.resp.Header.Revision > rev2 {
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"revision from follower must be less than or equal to leader's",
|
|
|
|
|
zap.Int64("leader-revision", rev2),
|
|
|
|
|
zap.Int64("follower-revision", p.resp.Header.Revision),
|
|
|
|
@ -211,7 +222,7 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
|
|
|
|
|
// leader expects follower's latest compact revision less than or equal to leader's
|
|
|
|
|
if p.resp.CompactRevision > crev2 {
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"compact revision from follower must be less than or equal to leader's",
|
|
|
|
|
zap.Int64("leader-compact-revision", crev2),
|
|
|
|
|
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
|
|
|
|
@ -222,7 +233,7 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
|
|
|
|
|
// follower's compact revision is leader's old one, then hashes must match
|
|
|
|
|
if p.resp.CompactRevision == crev && p.resp.Hash != h {
|
|
|
|
|
lg.Warn(
|
|
|
|
|
cm.lg.Warn(
|
|
|
|
|
"same compact revision then hashes must match",
|
|
|
|
|
zap.Int64("leader-compact-revision", crev2),
|
|
|
|
|
zap.Uint32("leader-hash", h),
|
|
|
|
@ -233,7 +244,7 @@ func (s *EtcdServer) checkHashKV() error {
|
|
|
|
|
mismatch(id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount))
|
|
|
|
|
cm.lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|