lease: guard 'Lease.itemSet' from concurrent writes
Fix https://github.com/coreos/etcd/issues/7448. Affected if etcd builds with Go 1.8+. Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
|
|
||||||
// sort keys so deletes are in same order among all members,
|
// sort keys so deletes are in same order among all members,
|
||||||
// otherwise the backened hashes will be different
|
// otherwise the backened hashes will be different
|
||||||
keys := make([]string, 0, len(l.itemSet))
|
keys := l.Keys()
|
||||||
for item := range l.itemSet {
|
|
||||||
keys = append(keys, item.Key)
|
|
||||||
}
|
|
||||||
sort.StringSlice(keys).Sort()
|
sort.StringSlice(keys).Sort()
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
|
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
|
||||||
@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
|
|||||||
return ErrLeaseNotFound
|
return ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
for _, it := range items {
|
for _, it := range items {
|
||||||
l.itemSet[it] = struct{}{}
|
l.itemSet[it] = struct{}{}
|
||||||
le.itemMap[it] = id
|
le.itemMap[it] = id
|
||||||
}
|
}
|
||||||
|
l.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
|||||||
return ErrLeaseNotFound
|
return ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
for _, it := range items {
|
for _, it := range items {
|
||||||
delete(l.itemSet, it)
|
delete(l.itemSet, it)
|
||||||
delete(le.itemMap, it)
|
delete(le.itemMap, it)
|
||||||
}
|
}
|
||||||
|
l.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -506,6 +507,8 @@ type Lease struct {
|
|||||||
// expiry is time when lease should expire; must be 64-bit aligned.
|
// expiry is time when lease should expire; must be 64-bit aligned.
|
||||||
expiry monotime.Time
|
expiry monotime.Time
|
||||||
|
|
||||||
|
// mu protects concurrent accesses to itemSet
|
||||||
|
mu sync.RWMutex
|
||||||
itemSet map[LeaseItem]struct{}
|
itemSet map[LeaseItem]struct{}
|
||||||
revokec chan struct{}
|
revokec chan struct{}
|
||||||
}
|
}
|
||||||
@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
|
|||||||
|
|
||||||
// Keys returns all the keys attached to the lease.
|
// Keys returns all the keys attached to the lease.
|
||||||
func (l *Lease) Keys() []string {
|
func (l *Lease) Keys() []string {
|
||||||
|
l.mu.RLock()
|
||||||
keys := make([]string, 0, len(l.itemSet))
|
keys := make([]string, 0, len(l.itemSet))
|
||||||
for k := range l.itemSet {
|
for k := range l.itemSet {
|
||||||
keys = append(keys, k.Key)
|
keys = append(keys, k.Key)
|
||||||
}
|
}
|
||||||
|
l.mu.RUnlock()
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,11 +15,13 @@
|
|||||||
package lease
|
package lease
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -77,6 +79,53 @@ func TestLessorGrant(t *testing.T) {
|
|||||||
be.BatchTx().Unlock()
|
be.BatchTx().Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
|
||||||
|
// from concurrent map writes on 'itemSet'.
|
||||||
|
func TestLeaseConcurrentKeys(t *testing.T) {
|
||||||
|
dir, be := NewTestBackend(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer be.Close()
|
||||||
|
|
||||||
|
fd := &fakeDeleter{}
|
||||||
|
|
||||||
|
le := newLessor(be, minLeaseTTL)
|
||||||
|
le.SetRangeDeleter(fd)
|
||||||
|
|
||||||
|
// grant a lease with long term (100 seconds) to
|
||||||
|
// avoid early termination during the test.
|
||||||
|
l, err := le.Grant(1, 100)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
itemn := 10
|
||||||
|
items := make([]LeaseItem, itemn)
|
||||||
|
for i := 0; i < itemn; i++ {
|
||||||
|
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
|
||||||
|
}
|
||||||
|
if err = le.Attach(l.ID, items); err != nil {
|
||||||
|
t.Fatalf("failed to attach items to the lease: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
donec := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
le.Detach(l.ID, items)
|
||||||
|
close(donec)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(itemn)
|
||||||
|
for i := 0; i < itemn; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
l.Keys()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
<-donec
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// TestLessorRevoke ensures Lessor can revoke a lease.
|
// TestLessorRevoke ensures Lessor can revoke a lease.
|
||||||
// The items in the revoked lease should be removed from
|
// The items in the revoked lease should be removed from
|
||||||
// the backend.
|
// the backend.
|
||||||
|
Reference in New Issue
Block a user