etcdserver: fix data race in Cluster struct
This commit is contained in:
@ -27,6 +27,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
"github.com/coreos/etcd/pkg/flags"
|
"github.com/coreos/etcd/pkg/flags"
|
||||||
@ -58,6 +59,7 @@ type Cluster struct {
|
|||||||
// removed id cannot be reused.
|
// removed id cannot be reused.
|
||||||
removed map[types.ID]bool
|
removed map[types.ID]bool
|
||||||
store store.Store
|
store store.Store
|
||||||
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterFromString returns Cluster through given cluster token and parsing
|
// NewClusterFromString returns Cluster through given cluster token and parsing
|
||||||
@ -112,9 +114,11 @@ func newCluster(token string) *Cluster {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Cluster) ID() types.ID { return c.id }
|
func (c *Cluster) ID() types.ID { return c.id }
|
||||||
|
|
||||||
func (c Cluster) Members() []*Member {
|
func (c *Cluster) Members() []*Member {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
var sms SortableMemberSlice
|
var sms SortableMemberSlice
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
sms = append(sms, m)
|
sms = append(sms, m)
|
||||||
@ -130,12 +134,16 @@ func (s SortableMemberSlice) Less(i, j int) bool { return s[i].ID < s[j].ID }
|
|||||||
func (s SortableMemberSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
func (s SortableMemberSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||||
|
|
||||||
func (c *Cluster) Member(id types.ID) *Member {
|
func (c *Cluster) Member(id types.ID) *Member {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
return c.members[id]
|
return c.members[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemberByName returns a Member with the given name if exists.
|
// MemberByName returns a Member with the given name if exists.
|
||||||
// If more than one member has the given name, it will panic.
|
// If more than one member has the given name, it will panic.
|
||||||
func (c *Cluster) MemberByName(name string) *Member {
|
func (c *Cluster) MemberByName(name string) *Member {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
var memb *Member
|
var memb *Member
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
if m.Name == name {
|
if m.Name == name {
|
||||||
@ -148,7 +156,9 @@ func (c *Cluster) MemberByName(name string) *Member {
|
|||||||
return memb
|
return memb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Cluster) MemberIDs() []types.ID {
|
func (c *Cluster) MemberIDs() []types.ID {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
var ids []types.ID
|
var ids []types.ID
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
ids = append(ids, m.ID)
|
ids = append(ids, m.ID)
|
||||||
@ -158,13 +168,17 @@ func (c Cluster) MemberIDs() []types.ID {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) IsIDRemoved(id types.ID) bool {
|
func (c *Cluster) IsIDRemoved(id types.ID) bool {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
return c.removed[id]
|
return c.removed[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerURLs returns a list of all peer addresses. Each address is prefixed
|
// PeerURLs returns a list of all peer addresses. Each address is prefixed
|
||||||
// with the scheme (currently "http://"). The returned list is sorted in
|
// with the scheme (currently "http://"). The returned list is sorted in
|
||||||
// ascending lexicographical order.
|
// ascending lexicographical order.
|
||||||
func (c Cluster) PeerURLs() []string {
|
func (c *Cluster) PeerURLs() []string {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
endpoints := make([]string, 0)
|
endpoints := make([]string, 0)
|
||||||
for _, p := range c.members {
|
for _, p := range c.members {
|
||||||
for _, addr := range p.PeerURLs {
|
for _, addr := range p.PeerURLs {
|
||||||
@ -178,7 +192,9 @@ func (c Cluster) PeerURLs() []string {
|
|||||||
// ClientURLs returns a list of all client addresses. Each address is prefixed
|
// ClientURLs returns a list of all client addresses. Each address is prefixed
|
||||||
// with the scheme (currently "http://"). The returned list is sorted in
|
// with the scheme (currently "http://"). The returned list is sorted in
|
||||||
// ascending lexicographical order.
|
// ascending lexicographical order.
|
||||||
func (c Cluster) ClientURLs() []string {
|
func (c *Cluster) ClientURLs() []string {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
urls := make([]string, 0)
|
urls := make([]string, 0)
|
||||||
for _, p := range c.members {
|
for _, p := range c.members {
|
||||||
for _, url := range p.ClientURLs {
|
for _, url := range p.ClientURLs {
|
||||||
@ -189,7 +205,9 @@ func (c Cluster) ClientURLs() []string {
|
|||||||
return urls
|
return urls
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Cluster) String() string {
|
func (c *Cluster) String() string {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
sl := []string{}
|
sl := []string{}
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
for _, u := range m.PeerURLs {
|
for _, u := range m.PeerURLs {
|
||||||
@ -279,6 +297,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
|||||||
// AddMember puts a new Member into the store.
|
// AddMember puts a new Member into the store.
|
||||||
// A Member with a matching id must not exist.
|
// A Member with a matching id must not exist.
|
||||||
func (c *Cluster) AddMember(m *Member) {
|
func (c *Cluster) AddMember(m *Member) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
b, err := json.Marshal(m.RaftAttributes)
|
b, err := json.Marshal(m.RaftAttributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("marshal raftAttributes should never fail: %v", err)
|
log.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||||
@ -301,6 +321,8 @@ func (c *Cluster) AddMember(m *Member) {
|
|||||||
// RemoveMember removes a member from the store.
|
// RemoveMember removes a member from the store.
|
||||||
// The given id MUST exist, or the function panics.
|
// The given id MUST exist, or the function panics.
|
||||||
func (c *Cluster) RemoveMember(id types.ID) {
|
func (c *Cluster) RemoveMember(id types.ID) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
|
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
|
||||||
log.Panicf("delete member should never fail: %v", err)
|
log.Panicf("delete member should never fail: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1382,31 +1382,6 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
|
|||||||
}
|
}
|
||||||
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
||||||
|
|
||||||
type clusterStoreRecorder struct {
|
|
||||||
recorder
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *clusterStoreRecorder) Add(m Member) {
|
|
||||||
cs.record(action{name: "Add", params: []interface{}{m}})
|
|
||||||
}
|
|
||||||
func (cs *clusterStoreRecorder) Get() Cluster {
|
|
||||||
cs.record(action{name: "Get"})
|
|
||||||
return Cluster{}
|
|
||||||
}
|
|
||||||
func (cs *clusterStoreRecorder) Remove(id uint64) {
|
|
||||||
cs.record(action{name: "Remove", params: []interface{}{id}})
|
|
||||||
}
|
|
||||||
func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false }
|
|
||||||
|
|
||||||
type removedClusterStore struct {
|
|
||||||
removed map[uint64]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cs *removedClusterStore) Add(m Member) {}
|
|
||||||
func (cs *removedClusterStore) Get() Cluster { return Cluster{} }
|
|
||||||
func (cs *removedClusterStore) Remove(id uint64) {}
|
|
||||||
func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] }
|
|
||||||
|
|
||||||
type nopSender struct{}
|
type nopSender struct{}
|
||||||
|
|
||||||
func (s *nopSender) Send(m []raftpb.Message) {}
|
func (s *nopSender) Send(m []raftpb.Message) {}
|
||||||
|
Reference in New Issue
Block a user