Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
494c012659 | |||
4abc381ebe | |||
73c8fdac53 | |||
ee2717493a | |||
2435eb9ecd | |||
8fb533dabe | |||
2f0f5ac504 | |||
9ab811d478 | |||
e0a99fb4ba |
@ -22,7 +22,10 @@ import (
|
|||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isSubset returns true if a is a subset of b
|
// isSubset returns true if a is a subset of b.
|
||||||
|
// If a is a prefix of b, then a is a subset of b.
|
||||||
|
// Given intervals [a1,a2) and [b1,b2), is
|
||||||
|
// the a interval a subset of b?
|
||||||
func isSubset(a, b *rangePerm) bool {
|
func isSubset(a, b *rangePerm) bool {
|
||||||
switch {
|
switch {
|
||||||
case len(a.end) == 0 && len(b.end) == 0:
|
case len(a.end) == 0 && len(b.end) == 0:
|
||||||
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
|
|||||||
// b is a key, a is a range
|
// b is a key, a is a range
|
||||||
return false
|
return false
|
||||||
case len(a.end) == 0:
|
case len(a.end) == 0:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
|
// a is a key, b is a range. need b1 <= a1 and a1 < b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
|
||||||
default:
|
default:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
|
// both are ranges. need b1 <= a1 and a2 <= b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
|
|||||||
i := 0
|
i := 0
|
||||||
for i < len(perms) {
|
for i < len(perms) {
|
||||||
begin, next := i, i
|
begin, next := i, i
|
||||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
|
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
|
||||||
next++
|
next++
|
||||||
}
|
}
|
||||||
|
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
|
||||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
if next != begin && len(perms[next].end) > 0 {
|
||||||
|
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||||
|
} else {
|
||||||
|
merged = append(merged, perms[begin])
|
||||||
|
if next != begin {
|
||||||
|
merged = append(merged, perms[next])
|
||||||
|
}
|
||||||
|
}
|
||||||
i = next + 1
|
i = next + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
||||||
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
},
|
},
|
||||||
// duplicate ranges
|
// duplicate ranges
|
||||||
{
|
{
|
||||||
|
@ -45,6 +45,8 @@ type simpleBalancer struct {
|
|||||||
// pinAddr is the currently pinned address; set to the empty string on
|
// pinAddr is the currently pinned address; set to the empty string on
|
||||||
// intialization and shutdown.
|
// intialization and shutdown.
|
||||||
pinAddr string
|
pinAddr string
|
||||||
|
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSimpleBalancer(eps []string) *simpleBalancer {
|
func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||||
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
|||||||
|
|
||||||
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
|
// gRPC might call Up after it called Close. We add this check
|
||||||
|
// to "fix" it up at application layer. Or our simplerBalancer
|
||||||
|
// might panic since b.upc is closed.
|
||||||
|
if b.closed {
|
||||||
|
return func(err error) {}
|
||||||
|
}
|
||||||
|
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
// notify waiting Get()s and pin first connected address
|
// notify waiting Get()s and pin first connected address
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
b.pinAddr = addr.Addr
|
b.pinAddr = addr.Addr
|
||||||
}
|
}
|
||||||
b.upEps[addr.Addr] = struct{}{}
|
b.upEps[addr.Addr] = struct{}{}
|
||||||
b.mu.Unlock()
|
|
||||||
// notify client that a connection is up
|
// notify client that a connection is up
|
||||||
b.readyOnce.Do(func() { close(b.readyc) })
|
b.readyOnce.Do(func() { close(b.readyc) })
|
||||||
|
|
||||||
return func(err error) {
|
return func(err error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
delete(b.upEps, addr.Addr)
|
delete(b.upEps, addr.Addr)
|
||||||
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
|||||||
|
|
||||||
func (b *simpleBalancer) Close() error {
|
func (b *simpleBalancer) Close() error {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
// In case gRPC calls close twice. TODO: remove the checking
|
||||||
|
// when we are sure that gRPC wont call close twice.
|
||||||
|
if b.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.closed = true
|
||||||
close(b.notifyCh)
|
close(b.notifyCh)
|
||||||
// terminate all waiting Get()s
|
// terminate all waiting Get()s
|
||||||
b.pinAddr = ""
|
b.pinAddr = ""
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
}
|
}
|
||||||
b.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -669,6 +669,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
w.mu.RUnlock()
|
w.mu.RUnlock()
|
||||||
|
|
||||||
for _, ws := range streams {
|
for _, ws := range streams {
|
||||||
|
// drain recvc so no old WatchResponses (e.g., Created messages)
|
||||||
|
// are processed while resuming
|
||||||
|
ws.drain()
|
||||||
|
|
||||||
// pause serveStream
|
// pause serveStream
|
||||||
ws.resumec <- -1
|
ws.resumec <- -1
|
||||||
|
|
||||||
@ -701,6 +705,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drain removes all buffered WatchResponses from the stream's receive channel.
|
||||||
|
func (ws *watcherStream) drain() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ws.recvc:
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||||
req := &pb.WatchCreateRequest{
|
req := &pb.WatchCreateRequest{
|
||||||
|
@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
cmp, err := parseCompare(line)
|
cmp, err := parseCompare(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
op, err := parseRequestUnion(line)
|
op, err := parseRequestUnion(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
|
@ -466,19 +466,24 @@ func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() ==
|
|||||||
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||||
|
|
||||||
// checkBindURLs returns an error if any URL uses a domain name.
|
// checkBindURLs returns an error if any URL uses a domain name.
|
||||||
|
// TODO: return error in 3.2.0
|
||||||
func checkBindURLs(urls []url.URL) error {
|
func checkBindURLs(urls []url.URL) error {
|
||||||
for _, url := range urls {
|
for _, url := range urls {
|
||||||
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
host := strings.Split(url.Host, ":")[0]
|
host, _, err := net.SplitHostPort(url.Host)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if host == "localhost" {
|
if host == "localhost" {
|
||||||
// special case for local address
|
// special case for local address
|
||||||
// TODO: support /etc/hosts ?
|
// TODO: support /etc/hosts ?
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if net.ParseIP(host) == nil {
|
if net.ParseIP(host) == nil {
|
||||||
return fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||||
|
plog.Warning(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -188,11 +188,8 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
|||||||
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
s, serr := concurrency.NewSession(cli)
|
|
||||||
if serr != nil {
|
e := concurrency.NewElection(cli, "test")
|
||||||
t.Fatal(serr)
|
|
||||||
}
|
|
||||||
e := concurrency.NewElection(s, "test")
|
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
err := e.Campaign(ctx, "abc")
|
err := e.Campaign(ctx, "abc")
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.8"
|
Version = "3.0.9"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
Reference in New Issue
Block a user