Membership: Add additional methods to trim/manage membership data in backend.

This commit is contained in:
Piotr Tabor
2021-04-02 18:01:26 +02:00
parent aa6597384b
commit 7ae3d25f91
4 changed files with 42 additions and 14 deletions

View File

@ -49,27 +49,22 @@ type Member struct {
// NewMember creates a Member without an ID and generates one based on the // NewMember creates a Member without an ID and generates one based on the
// cluster name, peer URLs, and time. This is used for bootstrapping/adding new member. // cluster name, peer URLs, and time. This is used for bootstrapping/adding new member.
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
return newMember(name, peerURLs, clusterName, now, false) memberId := computeMemberId(peerURLs, clusterName, now)
return newMember(name, peerURLs, memberId, false)
} }
// NewMemberAsLearner creates a learner Member without an ID and generates one based on the // NewMemberAsLearner creates a learner Member without an ID and generates one based on the
// cluster name, peer URLs, and time. This is used for adding new learner member. // cluster name, peer URLs, and time. This is used for adding new learner member.
func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
return newMember(name, peerURLs, clusterName, now, true) memberId := computeMemberId(peerURLs, clusterName, now)
} return newMember(name, peerURLs, memberId, true)
func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time, isLearner bool) *Member {
m := &Member{
RaftAttributes: RaftAttributes{
PeerURLs: peerURLs.StringSlice(),
IsLearner: isLearner,
},
Attributes: Attributes{Name: name},
} }
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
var b []byte var b []byte
sort.Strings(m.PeerURLs) peerURLstrs := peerURLs.StringSlice()
for _, p := range m.PeerURLs { sort.Strings(peerURLstrs)
for _, p := range peerURLstrs {
b = append(b, []byte(p)...) b = append(b, []byte(p)...)
} }
@ -79,7 +74,18 @@ func newMember(name string, peerURLs types.URLs, clusterName string, now *time.T
} }
hash := sha1.Sum(b) hash := sha1.Sum(b)
m.ID = types.ID(binary.BigEndian.Uint64(hash[:8])) return types.ID(binary.BigEndian.Uint64(hash[:8]))
}
func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member {
m := &Member{
RaftAttributes: RaftAttributes{
PeerURLs: peerURLs.StringSlice(),
IsLearner: isLearner,
},
Attributes: Attributes{Name: name},
ID: memberId,
}
return m return m
} }

View File

@ -57,6 +57,14 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx.UnsafePut(membersBucketName, mkey, mvalue) tx.UnsafePut(membersBucketName, mkey, mvalue)
} }
func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(clusterBucketName)
return nil
}
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := backendMemberKey(id) mkey := backendMemberKey(id)

View File

@ -28,6 +28,7 @@ import (
type BatchTx interface { type BatchTx interface {
ReadTx ReadTx
UnsafeCreateBucket(name []byte) UnsafeCreateBucket(name []byte)
UnsafeDeleteBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte) UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte) UnsafeDelete(bucketName []byte, key []byte)
@ -80,6 +81,18 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) {
t.pending++ t.pending++
} }
func (t *batchTx) UnsafeDeleteBucket(name []byte) {
err := t.tx.DeleteBucket(name)
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
}
t.pending++
}
// UnsafePut must be called holding the lock on the tx. // UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false) t.unsafePut(bucketName, key, value, false)

View File

@ -875,6 +875,7 @@ func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {} func (b *fakeBatchTx) RLock() {}
func (b *fakeBatchTx) RUnlock() {} func (b *fakeBatchTx) RUnlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
} }