server: Return revision range that hash was calcualted for

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz
2022-05-19 16:07:35 +02:00
parent 1ff59923d6
commit a3f609d742
9 changed files with 62 additions and 67 deletions

View File

@ -195,7 +195,7 @@ func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*
return nil, togRPCError(err) return nil, togRPCError(err)
} }
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev} resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: compactRev}
ms.hdr.fill(resp.Header) ms.hdr.fill(resp.Header)
return resp, nil return resp, nil
} }

View File

@ -51,26 +51,19 @@ type Hasher interface {
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
return &corruptionMonitor{ return &corruptionMonitor{
lg: lg, lg: lg,
hasher: hasherAdapter{s}, hasher: hasherAdapter{s, s.KV()},
} }
} }
type hasherAdapter struct { type hasherAdapter struct {
*EtcdServer *EtcdServer
mvcc.KV
} }
func (h hasherAdapter) MemberId() types.ID { func (h hasherAdapter) MemberId() types.ID {
return h.EtcdServer.ID() return h.EtcdServer.ID()
} }
func (h hasherAdapter) Hash() (hash uint32, revision int64, err error) {
return h.EtcdServer.KV().Hash()
}
func (h hasherAdapter) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) {
return h.EtcdServer.KV().HashByRev(rev)
}
func (h hasherAdapter) ReqTimeout() time.Duration { func (h hasherAdapter) ReqTimeout() time.Duration {
return h.EtcdServer.Cfg.ReqTimeout() return h.EtcdServer.Cfg.ReqTimeout()
} }
@ -107,7 +100,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
zap.String("local-member-id", cm.hasher.MemberId().String()), zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Int64("local-member-revision", rev), zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev), zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h), zap.Uint32("local-member-hash", h.Hash),
zap.String("remote-peer-id", peerID.String()), zap.String("remote-peer-id", peerID.String()),
zap.Strings("remote-peer-endpoints", p.eps), zap.Strings("remote-peer-endpoints", p.eps),
zap.Int64("remote-peer-revision", p.resp.Header.Revision), zap.Int64("remote-peer-revision", p.resp.Header.Revision),
@ -115,7 +108,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
zap.Uint32("remote-peer-hash", p.resp.Hash), zap.Uint32("remote-peer-hash", p.resp.Hash),
} }
if h != p.resp.Hash { if h.Hash != p.resp.Hash {
if crev == p.resp.CompactRevision { if crev == p.resp.CompactRevision {
cm.lg.Warn("found different hash values from remote peer", fields...) cm.lg.Warn("found different hash values from remote peer", fields...)
mismatch++ mismatch++
@ -135,7 +128,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
zap.String("local-member-id", cm.hasher.MemberId().String()), zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Int64("local-member-revision", rev), zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev), zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h), zap.Uint32("local-member-hash", h.Hash),
zap.String("remote-peer-id", p.id.String()), zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps), zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err), zap.Error(err),
@ -146,7 +139,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
zap.String("local-member-id", cm.hasher.MemberId().String()), zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Int64("local-member-revision", rev), zap.Int64("local-member-revision", rev),
zap.Int64("local-member-compact-revision", crev), zap.Int64("local-member-compact-revision", crev),
zap.Uint32("local-member-hash", h), zap.Uint32("local-member-hash", h.Hash),
zap.String("remote-peer-id", p.id.String()), zap.String("remote-peer-id", p.id.String()),
zap.Strings("remote-peer-endpoints", p.eps), zap.Strings("remote-peer-endpoints", p.eps),
zap.Error(err), zap.Error(err),
@ -193,15 +186,15 @@ func (cm *corruptionMonitor) periodicCheck() error {
cm.hasher.TriggerCorruptAlarm(id) cm.hasher.TriggerCorruptAlarm(id)
} }
if h2 != h && rev2 == rev && crev == crev2 { if h2.Hash != h.Hash && rev2 == rev && crev == crev2 {
cm.lg.Warn( cm.lg.Warn(
"found hash mismatch", "found hash mismatch",
zap.Int64("revision-1", rev), zap.Int64("revision-1", rev),
zap.Int64("compact-revision-1", crev), zap.Int64("compact-revision-1", crev),
zap.Uint32("hash-1", h), zap.Uint32("hash-1", h.Hash),
zap.Int64("revision-2", rev2), zap.Int64("revision-2", rev2),
zap.Int64("compact-revision-2", crev2), zap.Int64("compact-revision-2", crev2),
zap.Uint32("hash-2", h2), zap.Uint32("hash-2", h2.Hash),
) )
mismatch(uint64(cm.hasher.MemberId())) mismatch(uint64(cm.hasher.MemberId()))
} }
@ -237,11 +230,11 @@ func (cm *corruptionMonitor) periodicCheck() error {
} }
// follower's compact revision is leader's old one, then hashes must match // follower's compact revision is leader's old one, then hashes must match
if p.resp.CompactRevision == crev && p.resp.Hash != h { if p.resp.CompactRevision == crev && p.resp.Hash != h.Hash {
cm.lg.Warn( cm.lg.Warn(
"same compact revision then hashes must match", "same compact revision then hashes must match",
zap.Int64("leader-compact-revision", crev2), zap.Int64("leader-compact-revision", crev2),
zap.Uint32("leader-hash", h), zap.Uint32("leader-hash", h.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision), zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash), zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", types.ID(id).String()), zap.String("follower-peer-id", types.ID(id).String()),
@ -401,7 +394,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash.Hash, CompactRevision: compactRev}
respBytes, err := json.Marshal(resp) respBytes, err := json.Marshal(resp)
if err != nil { if err != nil {
h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) h.lg.Warn("failed to marshal hashKV response", zap.Error(err))

View File

@ -24,6 +24,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -70,18 +71,18 @@ func TestInitialCheck(t *testing.T) {
}, },
{ {
name: "Peer returned same hash", name: "Peer returned same hash",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}},
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
}, },
{ {
name: "Peer returned different hash with same compaction rev", name: "Peer returned different hash with same compaction rev",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
expectError: true, expectError: true,
}, },
{ {
name: "Peer returned different hash and compaction rev", name: "Peer returned different hash and compaction rev",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
}, },
} }
@ -136,17 +137,17 @@ func TestPeriodicCheck(t *testing.T) {
}, },
{ {
name: "Different local hash and revision", name: "Different local hash and revision",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, revision: 1}, {hash: 2, revision: 2}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2}}},
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
}, },
{ {
name: "Different local hash and compaction revision", name: "Different local hash and compaction revision",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, compactRev: 1}, {hash: 2, compactRev: 2}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, compactRev: 2}}},
expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
}, },
{ {
name: "Different local hash and same revisions", name: "Different local hash and same revisions",
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 1, compactRev: 1}}}, hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 1, compactRev: 1}}},
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"},
expectCorrupt: true, expectCorrupt: true,
}, },
@ -176,7 +177,7 @@ func TestPeriodicCheck(t *testing.T) {
{ {
name: "Peer with same hash and compact revision", name: "Peer with same hash and compact revision",
hasher: fakeHasher{ hasher: fakeHasher{
hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 2, compactRev: 2}}, hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}},
}, },
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
@ -184,7 +185,7 @@ func TestPeriodicCheck(t *testing.T) {
{ {
name: "Peer with different hash and same compact revision as first local", name: "Peer with different hash and same compact revision as first local",
hasher: fakeHasher{ hasher: fakeHasher{
hashByRevResponses: []hashByRev{{hash: 1, revision: 1, compactRev: 1}, {hash: 2, revision: 2, compactRev: 2}}, hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1, compactRev: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2, compactRev: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1, MemberId: 666}, CompactRevision: 1, Hash: 2}}},
}, },
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"}, expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(666)"},
@ -231,7 +232,7 @@ type fakeHasher struct {
} }
type hashByRev struct { type hashByRev struct {
hash uint32 hash mvcc.KeyValueHash
revision int64 revision int64
compactRev int64 compactRev int64
err error err error
@ -241,10 +242,10 @@ func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) {
panic("not implemented") panic("not implemented")
} }
func (f *fakeHasher) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) { func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, compactRev int64, err error) {
f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev)) f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev))
if len(f.hashByRevResponses) == 0 { if len(f.hashByRevResponses) == 0 {
return 0, 0, 0, nil return mvcc.KeyValueHash{}, 0, 0, nil
} }
hashByRev := f.hashByRevResponses[f.hashByRevIndex] hashByRev := f.hashByRevResponses[f.hashByRevIndex]
f.hashByRevIndex++ f.hashByRevIndex++

View File

@ -22,8 +22,8 @@ import (
"go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/mvcc/buckets"
) )
func unsafeHashByRev(tx backend.ReadTx, lower, upper int64, keep map[revision]struct{}) (uint32, error) { func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
h := newKVHasher(lower, upper, keep) h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error { err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v) h.WriteKeyValue(k, v)
return nil return nil
@ -32,29 +32,30 @@ func unsafeHashByRev(tx backend.ReadTx, lower, upper int64, keep map[revision]st
} }
type kvHasher struct { type kvHasher struct {
hash hash.Hash32 hash hash.Hash32
lower, upper int64 compactRevision int64
keep map[revision]struct{} revision int64
keep map[revision]struct{}
} }
func newKVHasher(lower, upper int64, keep map[revision]struct{}) kvHasher { func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(buckets.Key.Name()) h.Write(buckets.Key.Name())
return kvHasher{ return kvHasher{
hash: h, hash: h,
lower: lower, compactRevision: compactRev,
upper: upper, revision: rev,
keep: keep, keep: keep,
} }
} }
func (h *kvHasher) WriteKeyValue(k, v []byte) { func (h *kvHasher) WriteKeyValue(k, v []byte) {
kr := bytesToRev(k) kr := bytesToRev(k)
upper := revision{main: h.upper + 1} upper := revision{main: h.revision + 1}
if !upper.GreaterThan(kr) { if !upper.GreaterThan(kr) {
return return
} }
lower := revision{main: h.lower + 1} lower := revision{main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion // skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one. // due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(h.keep) > 0 { if lower.GreaterThan(kr) && len(h.keep) > 0 {
@ -66,6 +67,12 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) {
h.hash.Write(v) h.hash.Write(v)
} }
func (h *kvHasher) Hash() uint32 { func (h *kvHasher) Hash() KeyValueHash {
return h.hash.Sum32() return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision}
}
type KeyValueHash struct {
Hash uint32
CompactRevision int64
Revision int64
} }

View File

@ -41,7 +41,7 @@ func TestHashByRevValue(t *testing.T) {
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
assert.Less(t, int64(compactionCycle*10), totalRevisions) assert.Less(t, int64(compactionCycle*10), totalRevisions)
var rev int64 var rev int64
var got []kvHash var got []KeyValueHash
for ; rev < totalRevisions; rev += compactionCycle { for ; rev < totalRevisions; rev += compactionCycle {
putKVs(s, rev, compactionCycle) putKVs(s, rev, compactionCycle)
hash := testHashByRev(t, s, rev+compactionCycle/2) hash := testHashByRev(t, s, rev+compactionCycle/2)
@ -50,7 +50,7 @@ func TestHashByRevValue(t *testing.T) {
putKVs(s, rev, totalRevisions) putKVs(s, rev, totalRevisions)
hash := testHashByRev(t, s, rev+totalRevisions/2) hash := testHashByRev(t, s, rev+totalRevisions/2)
got = append(got, hash) got = append(got, hash)
assert.Equal(t, []kvHash{ assert.Equal(t, []KeyValueHash{
{4082599214, -1, 35}, {4082599214, -1, 35},
{2279933401, 35, 106}, {2279933401, 35, 106},
{3284231217, 106, 177}, {3284231217, 106, 177},
@ -81,7 +81,7 @@ func TestHashByRevValueZero(t *testing.T) {
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
assert.Less(t, int64(compactionCycle*10), totalRevisions) assert.Less(t, int64(compactionCycle*10), totalRevisions)
var rev int64 var rev int64
var got []kvHash var got []KeyValueHash
for ; rev < totalRevisions; rev += compactionCycle { for ; rev < totalRevisions; rev += compactionCycle {
putKVs(s, rev, compactionCycle) putKVs(s, rev, compactionCycle)
hash := testHashByRev(t, s, 0) hash := testHashByRev(t, s, 0)
@ -90,7 +90,7 @@ func TestHashByRevValueZero(t *testing.T) {
putKVs(s, rev, totalRevisions) putKVs(s, rev, totalRevisions)
hash := testHashByRev(t, s, 0) hash := testHashByRev(t, s, 0)
got = append(got, hash) got = append(got, hash)
assert.Equal(t, []kvHash{ assert.Equal(t, []KeyValueHash{
{1913897190, -1, 73}, {1913897190, -1, 73},
{224860069, 73, 145}, {224860069, 73, 145},
{1565167519, 145, 217}, {1565167519, 145, 217},
@ -119,8 +119,8 @@ func putKVs(s *store, rev, count int64) {
} }
} }
func testHashByRev(t *testing.T, s *store, rev int64) kvHash { func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
hash, currentRev, compactRev, err := s.HashByRev(rev) hash, currentRev, _, err := s.HashByRev(rev)
assert.NoError(t, err, "error on rev %v", rev) assert.NoError(t, err, "error on rev %v", rev)
if rev == 0 { if rev == 0 {
@ -128,13 +128,7 @@ func testHashByRev(t *testing.T, s *store, rev int64) kvHash {
} }
_, err = s.Compact(traceutil.TODO(), rev) _, err = s.Compact(traceutil.TODO(), rev)
assert.NoError(t, err, "error on compact %v", rev) assert.NoError(t, err, "error on compact %v", rev)
return kvHash{hash: hash, compactRevision: compactRev, revision: rev} return hash
}
type kvHash struct {
hash uint32
compactRevision int64
revision int64
} }
// TODO: Change this to fuzz test // TODO: Change this to fuzz test

View File

@ -136,7 +136,7 @@ type Hasher interface {
Hash() (hash uint32, revision int64, err error) Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision. // HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) HashByRev(rev int64) (hash KeyValueHash, revision int64, compactRev int64, err error)
} }
// WatchableKV is a KV that can be watched. // WatchableKV is a KV that can be watched.

View File

@ -164,7 +164,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err return h, s.currentRev, err
} }
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { func (s *store) HashByRev(rev int64) (hash KeyValueHash, currentRev int64, compactRev int64, err error) {
start := time.Now() start := time.Now()
s.mu.RLock() s.mu.RLock()
@ -174,10 +174,10 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
if rev > 0 && rev <= compactRev { if rev > 0 && rev <= compactRev {
s.mu.RUnlock() s.mu.RUnlock()
return 0, 0, compactRev, ErrCompacted return KeyValueHash{}, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev { } else if rev > 0 && rev > currentRev {
s.mu.RUnlock() s.mu.RUnlock()
return 0, currentRev, 0, ErrFutureRev return KeyValueHash{}, currentRev, 0, ErrFutureRev
} }
if rev == 0 { if rev == 0 {

View File

@ -23,7 +23,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32, error) { func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
totalStart := time.Now() totalStart := time.Now()
keep := s.kvindex.Compact(compactMainRev) keep := s.kvindex.Compact(compactMainRev)
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
@ -67,7 +67,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32
"finished scheduled compaction", "finished scheduled compaction",
zap.Int64("compact-revision", compactMainRev), zap.Int64("compact-revision", compactMainRev),
zap.Duration("took", time.Since(totalStart)), zap.Duration("took", time.Since(totalStart)),
zap.Uint32("hash", hash), zap.Uint32("hash", hash.Hash),
) )
return hash, nil return hash, nil
} }
@ -82,7 +82,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32
select { select {
case <-time.After(10 * time.Millisecond): case <-time.After(10 * time.Millisecond):
case <-s.stopc: case <-s.stopc:
return 0, fmt.Errorf("interrupted due to stop signal") return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
} }
} }
} }

View File

@ -559,7 +559,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
select { select {
case <-donec: case <-donec:
return return
case hashCompactc <- hashKVResult{hash, compactRev}: case hashCompactc <- hashKVResult{hash.Hash, compactRev}:
} }
} }
}() }()
@ -618,7 +618,7 @@ func TestHashKVZeroRevision(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var hash2 uint32 var hash2 KeyValueHash
hash2, _, _, err = s.HashByRev(0) hash2, _, _, err = s.HashByRev(0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)