Compare commits
41 Commits
Author | SHA1 | Date | |
---|---|---|---|
494c012659 | |||
4abc381ebe | |||
73c8fdac53 | |||
ee2717493a | |||
2435eb9ecd | |||
8fb533dabe | |||
2f0f5ac504 | |||
9ab811d478 | |||
e0a99fb4ba | |||
d40982fc91 | |||
fe3a1cc31b | |||
70713706a1 | |||
0054e7e89b | |||
97f718b504 | |||
202da9270e | |||
6e83ec0ed7 | |||
5c44cdfdaa | |||
09a239f040 | |||
3faff8b2e2 | |||
2345fda18e | |||
5695120efc | |||
183293e061 | |||
4b48876f0e | |||
5089bf58fb | |||
480a347179 | |||
59e560c7a7 | |||
0bd9bea2e9 | |||
bd7581ac59 | |||
db378c3d26 | |||
23740162dc | |||
96422a955f | |||
6fd996fdac | |||
9efa00d103 | |||
72d30f4c34 | |||
2e92779777 | |||
404415b1e3 | |||
07e421d245 | |||
a7d6e29275 | |||
1a8b295dab | |||
ffc45cc066 | |||
0db1ba8093 |
@ -22,7 +22,10 @@ import (
|
|||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isSubset returns true if a is a subset of b
|
// isSubset returns true if a is a subset of b.
|
||||||
|
// If a is a prefix of b, then a is a subset of b.
|
||||||
|
// Given intervals [a1,a2) and [b1,b2), is
|
||||||
|
// the a interval a subset of b?
|
||||||
func isSubset(a, b *rangePerm) bool {
|
func isSubset(a, b *rangePerm) bool {
|
||||||
switch {
|
switch {
|
||||||
case len(a.end) == 0 && len(b.end) == 0:
|
case len(a.end) == 0 && len(b.end) == 0:
|
||||||
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
|
|||||||
// b is a key, a is a range
|
// b is a key, a is a range
|
||||||
return false
|
return false
|
||||||
case len(a.end) == 0:
|
case len(a.end) == 0:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
|
// a is a key, b is a range. need b1 <= a1 and a1 < b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
|
||||||
default:
|
default:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
|
// both are ranges. need b1 <= a1 and a2 <= b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
|
|||||||
i := 0
|
i := 0
|
||||||
for i < len(perms) {
|
for i < len(perms) {
|
||||||
begin, next := i, i
|
begin, next := i, i
|
||||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
|
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
|
||||||
next++
|
next++
|
||||||
}
|
}
|
||||||
|
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
|
||||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
if next != begin && len(perms[next].end) > 0 {
|
||||||
|
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||||
|
} else {
|
||||||
|
merged = append(merged, perms[begin])
|
||||||
|
if next != begin {
|
||||||
|
merged = append(merged, perms[next])
|
||||||
|
}
|
||||||
|
}
|
||||||
i = next + 1
|
i = next + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
||||||
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
},
|
},
|
||||||
// duplicate ranges
|
// duplicate ranges
|
||||||
{
|
{
|
||||||
|
@ -45,6 +45,8 @@ type simpleBalancer struct {
|
|||||||
// pinAddr is the currently pinned address; set to the empty string on
|
// pinAddr is the currently pinned address; set to the empty string on
|
||||||
// intialization and shutdown.
|
// intialization and shutdown.
|
||||||
pinAddr string
|
pinAddr string
|
||||||
|
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSimpleBalancer(eps []string) *simpleBalancer {
|
func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||||
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
|||||||
|
|
||||||
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
|
// gRPC might call Up after it called Close. We add this check
|
||||||
|
// to "fix" it up at application layer. Or our simplerBalancer
|
||||||
|
// might panic since b.upc is closed.
|
||||||
|
if b.closed {
|
||||||
|
return func(err error) {}
|
||||||
|
}
|
||||||
|
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
// notify waiting Get()s and pin first connected address
|
// notify waiting Get()s and pin first connected address
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
b.pinAddr = addr.Addr
|
b.pinAddr = addr.Addr
|
||||||
}
|
}
|
||||||
b.upEps[addr.Addr] = struct{}{}
|
b.upEps[addr.Addr] = struct{}{}
|
||||||
b.mu.Unlock()
|
|
||||||
// notify client that a connection is up
|
// notify client that a connection is up
|
||||||
b.readyOnce.Do(func() { close(b.readyc) })
|
b.readyOnce.Do(func() { close(b.readyc) })
|
||||||
|
|
||||||
return func(err error) {
|
return func(err error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
delete(b.upEps, addr.Addr)
|
delete(b.upEps, addr.Addr)
|
||||||
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
|||||||
|
|
||||||
func (b *simpleBalancer) Close() error {
|
func (b *simpleBalancer) Close() error {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
// In case gRPC calls close twice. TODO: remove the checking
|
||||||
|
// when we are sure that gRPC wont call close twice.
|
||||||
|
if b.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.closed = true
|
||||||
close(b.notifyCh)
|
close(b.notifyCh)
|
||||||
// terminate all waiting Get()s
|
// terminate all waiting Get()s
|
||||||
b.pinAddr = ""
|
b.pinAddr = ""
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
}
|
}
|
||||||
b.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ type Election struct {
|
|||||||
|
|
||||||
// NewElection returns a new election on a given key prefix.
|
// NewElection returns a new election on a given key prefix.
|
||||||
func NewElection(client *v3.Client, pfx string) *Election {
|
func NewElection(client *v3.Client, pfx string) *Election {
|
||||||
return &Election{client: client, keyPrefix: pfx}
|
return &Election{client: client, keyPrefix: pfx + "/"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Campaign puts a value as eligible for the election. It blocks until
|
// Campaign puts a value as eligible for the election. It blocks until
|
||||||
@ -59,7 +59,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
|
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
|
||||||
if !resp.Succeeded {
|
if !resp.Succeeded {
|
||||||
kv := resp.Responses[0].GetResponseRange().Kvs[0]
|
kv := resp.Responses[0].GetResponseRange().Kvs[0]
|
||||||
|
@ -32,7 +32,7 @@ type Mutex struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
||||||
return &Mutex{client, pfx, "", -1}
|
return &Mutex{client, pfx + "/", "", -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||||
@ -43,7 +43,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
return serr
|
return serr
|
||||||
}
|
}
|
||||||
|
|
||||||
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
|
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
|
||||||
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
||||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||||
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
||||||
|
@ -669,6 +669,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
w.mu.RUnlock()
|
w.mu.RUnlock()
|
||||||
|
|
||||||
for _, ws := range streams {
|
for _, ws := range streams {
|
||||||
|
// drain recvc so no old WatchResponses (e.g., Created messages)
|
||||||
|
// are processed while resuming
|
||||||
|
ws.drain()
|
||||||
|
|
||||||
// pause serveStream
|
// pause serveStream
|
||||||
ws.resumec <- -1
|
ws.resumec <- -1
|
||||||
|
|
||||||
@ -701,6 +705,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drain removes all buffered WatchResponses from the stream's receive channel.
|
||||||
|
func (ws *watcherStream) drain() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ws.recvc:
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||||
req := &pb.WatchCreateRequest{
|
req := &pb.WatchCreateRequest{
|
||||||
|
36
cmd/Godeps/Godeps.json
generated
36
cmd/Godeps/Godeps.json
generated
@ -237,48 +237,48 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc",
|
"ImportPath": "google.golang.org/grpc",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/codes",
|
"ImportPath": "google.golang.org/grpc/codes",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/credentials",
|
"ImportPath": "google.golang.org/grpc/credentials",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/grpclog",
|
"ImportPath": "google.golang.org/grpc/grpclog",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/internal",
|
"ImportPath": "google.golang.org/grpc/internal",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/metadata",
|
"ImportPath": "google.golang.org/grpc/metadata",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/naming",
|
"ImportPath": "google.golang.org/grpc/naming",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/peer",
|
"ImportPath": "google.golang.org/grpc/peer",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/transport",
|
"ImportPath": "google.golang.org/grpc/transport",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "gopkg.in/cheggaaa/pb.v1",
|
"ImportPath": "gopkg.in/cheggaaa/pb.v1",
|
||||||
|
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
@ -170,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
if _, ok := err.(*rpcError); ok {
|
if _, ok := err.(*rpcError); ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err == errConnClosing {
|
if err == errConnClosing || err == errConnUnavailable {
|
||||||
if c.failFast {
|
if c.failFast {
|
||||||
return Errorf(codes.Unavailable, "%v", errConnClosing)
|
return Errorf(codes.Unavailable, "%v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -73,7 +73,9 @@ var (
|
|||||||
errConnDrain = errors.New("grpc: the connection is drained")
|
errConnDrain = errors.New("grpc: the connection is drained")
|
||||||
// errConnClosing indicates that the connection is closing.
|
// errConnClosing indicates that the connection is closing.
|
||||||
errConnClosing = errors.New("grpc: the connection is closing")
|
errConnClosing = errors.New("grpc: the connection is closing")
|
||||||
errNoAddr = errors.New("grpc: there is no address available to dial")
|
// errConnUnavailable indicates that the connection is unavailable.
|
||||||
|
errConnUnavailable = errors.New("grpc: the connection is unavailable")
|
||||||
|
errNoAddr = errors.New("grpc: there is no address available to dial")
|
||||||
// minimum time to give a connection to complete
|
// minimum time to give a connection to complete
|
||||||
minConnectTimeout = 20 * time.Second
|
minConnectTimeout = 20 * time.Second
|
||||||
)
|
)
|
||||||
@ -213,9 +215,14 @@ func WithUserAgent(s string) DialOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial creates a client connection the given target.
|
// Dial creates a client connection to the given target.
|
||||||
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
||||||
ctx := context.Background()
|
return DialContext(context.Background(), target, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DialContext creates a client connection to the given target
|
||||||
|
// using the supplied context.
|
||||||
|
func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
|
||||||
cc := &ClientConn{
|
cc := &ClientConn{
|
||||||
target: target,
|
target: target,
|
||||||
conns: make(map[Address]*addrConn),
|
conns: make(map[Address]*addrConn),
|
||||||
@ -472,6 +479,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
if cc.dopts.balancer == nil {
|
if cc.dopts.balancer == nil {
|
||||||
// If balancer is nil, there should be only one addrConn available.
|
// If balancer is nil, there should be only one addrConn available.
|
||||||
cc.mu.RLock()
|
cc.mu.RLock()
|
||||||
|
if cc.conns == nil {
|
||||||
|
cc.mu.RUnlock()
|
||||||
|
return nil, nil, toRPCErr(ErrClientConnClosing)
|
||||||
|
}
|
||||||
for _, ac = range cc.conns {
|
for _, ac = range cc.conns {
|
||||||
// Break after the first iteration to get the first addrConn.
|
// Break after the first iteration to get the first addrConn.
|
||||||
ok = true
|
ok = true
|
||||||
@ -501,11 +512,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
}
|
}
|
||||||
return nil, nil, errConnClosing
|
return nil, nil, errConnClosing
|
||||||
}
|
}
|
||||||
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
|
||||||
// - If RPC is failfast, ac.wait should not block.
|
|
||||||
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
|
||||||
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
|
||||||
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if put != nil {
|
if put != nil {
|
||||||
put()
|
put()
|
||||||
@ -757,36 +764,42 @@ func (ac *addrConn) transportMonitor() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||||
// iv) transport is in TransientFailure and blocking is false.
|
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
|
||||||
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
|
||||||
for {
|
for {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
switch {
|
switch {
|
||||||
case ac.state == Shutdown:
|
case ac.state == Shutdown:
|
||||||
err := ac.tearDownErr
|
if failfast || !hasBalancer {
|
||||||
|
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
|
||||||
|
err := ac.tearDownErr
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil, err
|
return nil, errConnClosing
|
||||||
case ac.state == Ready:
|
case ac.state == Ready:
|
||||||
ct := ac.transport
|
ct := ac.transport
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return ct, nil
|
return ct, nil
|
||||||
case ac.state == TransientFailure && !blocking:
|
case ac.state == TransientFailure:
|
||||||
ac.mu.Unlock()
|
if failfast || hasBalancer {
|
||||||
return nil, errConnClosing
|
ac.mu.Unlock()
|
||||||
default:
|
return nil, errConnUnavailable
|
||||||
ready := ac.ready
|
|
||||||
if ready == nil {
|
|
||||||
ready = make(chan struct{})
|
|
||||||
ac.ready = ready
|
|
||||||
}
|
|
||||||
ac.mu.Unlock()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, toRPCErr(ctx.Err())
|
|
||||||
// Wait until the new transport is ready or failed.
|
|
||||||
case <-ready:
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ready := ac.ready
|
||||||
|
if ready == nil {
|
||||||
|
ready = make(chan struct{})
|
||||||
|
ac.ready = ready
|
||||||
|
}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, toRPCErr(ctx.Err())
|
||||||
|
// Wait until the new transport is ready or failed.
|
||||||
|
case <-ready:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -146,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
if _, ok := err.(*rpcError); ok {
|
if _, ok := err.(*rpcError); ok {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err == errConnClosing {
|
if err == errConnClosing || err == errConnUnavailable {
|
||||||
if c.failFast {
|
if c.failFast {
|
||||||
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
|
return nil, Errorf(codes.Unavailable, "%v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -53,8 +53,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, srv := range addrs {
|
for _, srv := range addrs {
|
||||||
target := strings.TrimSuffix(srv.Target, ".")
|
port := fmt.Sprintf("%d", srv.Port)
|
||||||
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
|
host := net.JoinHostPort(srv.Target, port)
|
||||||
tcpAddr, err := resolveTCPAddr("tcp", host)
|
tcpAddr, err := resolveTCPAddr("tcp", host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Warningf("couldn't resolve host %s during SRV discovery", host)
|
plog.Warningf("couldn't resolve host %s during SRV discovery", host)
|
||||||
@ -70,8 +70,11 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
|||||||
n = fmt.Sprintf("%d", tempName)
|
n = fmt.Sprintf("%d", tempName)
|
||||||
tempName += 1
|
tempName += 1
|
||||||
}
|
}
|
||||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
|
// SRV records have a trailing dot but URL shouldn't.
|
||||||
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, host)
|
shortHost := strings.TrimSuffix(srv.Target, ".")
|
||||||
|
urlHost := net.JoinHostPort(shortHost, port)
|
||||||
|
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, urlHost))
|
||||||
|
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, urlHost)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ package discovery
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
@ -29,11 +30,22 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
name := "dnsClusterTest"
|
name := "dnsClusterTest"
|
||||||
|
dns := map[string]string{
|
||||||
|
"1.example.com.:2480": "10.0.0.1:2480",
|
||||||
|
"2.example.com.:2480": "10.0.0.2:2480",
|
||||||
|
"3.example.com.:2480": "10.0.0.3:2480",
|
||||||
|
"4.example.com.:2380": "10.0.0.3:2380",
|
||||||
|
}
|
||||||
|
srvAll := []*net.SRV{
|
||||||
|
{Target: "1.example.com.", Port: 2480},
|
||||||
|
{Target: "2.example.com.", Port: 2480},
|
||||||
|
{Target: "3.example.com.", Port: 2480},
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
withSSL []*net.SRV
|
withSSL []*net.SRV
|
||||||
withoutSSL []*net.SRV
|
withoutSSL []*net.SRV
|
||||||
urls []string
|
urls []string
|
||||||
dns map[string]string
|
|
||||||
|
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
@ -41,61 +53,50 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
nil,
|
nil,
|
||||||
nil,
|
|
||||||
|
|
||||||
"",
|
"",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*net.SRV{
|
srvAll,
|
||||||
{Target: "10.0.0.1", Port: 2480},
|
|
||||||
{Target: "10.0.0.2", Port: 2480},
|
|
||||||
{Target: "10.0.0.3", Port: 2480},
|
|
||||||
},
|
|
||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
nil,
|
nil,
|
||||||
|
|
||||||
|
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
srvAll,
|
||||||
|
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
|
||||||
nil,
|
nil,
|
||||||
|
|
||||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480,3=http://4.example.com:2380",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*net.SRV{
|
srvAll,
|
||||||
{Target: "10.0.0.1", Port: 2480},
|
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
|
||||||
{Target: "10.0.0.2", Port: 2480},
|
|
||||||
{Target: "10.0.0.3", Port: 2480},
|
|
||||||
},
|
|
||||||
[]*net.SRV{
|
|
||||||
{Target: "10.0.0.1", Port: 2380},
|
|
||||||
},
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:2380",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
[]*net.SRV{
|
|
||||||
{Target: "10.0.0.1", Port: 2480},
|
|
||||||
{Target: "10.0.0.2", Port: 2480},
|
|
||||||
{Target: "10.0.0.3", Port: 2480},
|
|
||||||
},
|
|
||||||
[]*net.SRV{
|
|
||||||
{Target: "10.0.0.1", Port: 2380},
|
|
||||||
},
|
|
||||||
[]string{"https://10.0.0.1:2480"},
|
[]string{"https://10.0.0.1:2480"},
|
||||||
nil,
|
|
||||||
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:2380",
|
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480,2=http://4.example.com:2380",
|
||||||
},
|
},
|
||||||
// matching local member with resolved addr and return unresolved hostnames
|
// matching local member with resolved addr and return unresolved hostnames
|
||||||
{
|
{
|
||||||
[]*net.SRV{
|
srvAll,
|
||||||
{Target: "1.example.com.", Port: 2480},
|
|
||||||
{Target: "2.example.com.", Port: 2480},
|
|
||||||
{Target: "3.example.com.", Port: 2480},
|
|
||||||
},
|
|
||||||
nil,
|
nil,
|
||||||
[]string{"https://10.0.0.1:2480"},
|
[]string{"https://10.0.0.1:2480"},
|
||||||
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
|
|
||||||
|
|
||||||
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
||||||
},
|
},
|
||||||
|
// invalid
|
||||||
|
}
|
||||||
|
|
||||||
|
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||||
|
if strings.Contains(addr, "10.0.0.") {
|
||||||
|
// accept IP addresses when resolving apurls
|
||||||
|
return net.ResolveTCPAddr(network, addr)
|
||||||
|
}
|
||||||
|
if dns[addr] == "" {
|
||||||
|
return nil, errors.New("missing dns record")
|
||||||
|
}
|
||||||
|
return net.ResolveTCPAddr(network, dns[addr])
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -108,12 +109,6 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return "", nil, errors.New("Unknown service in mock")
|
return "", nil, errors.New("Unknown service in mock")
|
||||||
}
|
}
|
||||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
|
||||||
if tt.dns == nil || tt.dns[addr] == "" {
|
|
||||||
return net.ResolveTCPAddr(network, addr)
|
|
||||||
}
|
|
||||||
return net.ResolveTCPAddr(network, tt.dns[addr])
|
|
||||||
}
|
|
||||||
urls := testutil.MustNewURLs(t, tt.urls)
|
urls := testutil.MustNewURLs(t, tt.urls)
|
||||||
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
|
|||||||
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
||||||
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
||||||
}
|
}
|
||||||
|
rqs := []txnRequests{
|
||||||
rqs := txnRequests{
|
{
|
||||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||||
ifSucess: []string{"get key1", "get key2"},
|
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
|
||||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
compare: []string{`version("key \"with\" space") = "1"`},
|
||||||
|
ifSucess: []string{`get "key \"with\" space"`},
|
||||||
|
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if err := ctlV3Txn(cx, rqs); err != nil {
|
for _, rq := range rqs {
|
||||||
cx.t.Fatal(err)
|
if err := ctlV3Txn(cx, rq); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
|
|||||||
var cfg authCfg
|
var cfg authCfg
|
||||||
|
|
||||||
splitted := strings.SplitN(userFlag, ":", 2)
|
splitted := strings.SplitN(userFlag, ":", 2)
|
||||||
if len(splitted) == 0 {
|
if len(splitted) < 2 {
|
||||||
cfg.username = userFlag
|
cfg.username = userFlag
|
||||||
cfg.password, err = speakeasy.Ask("Password: ")
|
cfg.password, err = speakeasy.Ask("Password: ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -36,7 +36,10 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
"github.com/coreos/etcd/snap"
|
||||||
|
"github.com/coreos/etcd/store"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -186,8 +189,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
||||||
}
|
}
|
||||||
|
|
||||||
makeDB(snapdir, args[0])
|
makeDB(snapdir, args[0], len(cl.Members()))
|
||||||
makeWAL(waldir, cl)
|
makeWALAndSnap(waldir, snapdir, cl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initialClusterFromName(name string) string {
|
func initialClusterFromName(name string) string {
|
||||||
@ -199,11 +202,18 @@ func initialClusterFromName(name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// makeWAL creates a WAL for the initial cluster
|
// makeWAL creates a WAL for the initial cluster
|
||||||
func makeWAL(waldir string, cl *membership.RaftCluster) {
|
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
|
||||||
if err := fileutil.CreateDirAll(waldir); err != nil {
|
if err := fileutil.CreateDirAll(waldir); err != nil {
|
||||||
ExitWithError(ExitIO, err)
|
ExitWithError(ExitIO, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add members again to persist them to the store we create.
|
||||||
|
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||||
|
cl.SetStore(st)
|
||||||
|
for _, m := range cl.Members() {
|
||||||
|
cl.AddMember(m)
|
||||||
|
}
|
||||||
|
|
||||||
m := cl.MemberByName(restoreName)
|
m := cl.MemberByName(restoreName)
|
||||||
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
||||||
metadata, merr := md.Marshal()
|
metadata, merr := md.Marshal()
|
||||||
@ -227,7 +237,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ents := make([]raftpb.Entry, len(peers))
|
ents := make([]raftpb.Entry, len(peers))
|
||||||
|
nodeIDs := make([]uint64, len(peers))
|
||||||
for i, p := range peers {
|
for i, p := range peers {
|
||||||
|
nodeIDs[i] = p.ID
|
||||||
cc := raftpb.ConfChange{
|
cc := raftpb.ConfChange{
|
||||||
Type: raftpb.ConfChangeAddNode,
|
Type: raftpb.ConfChangeAddNode,
|
||||||
NodeID: p.ID,
|
NodeID: p.ID,
|
||||||
@ -245,20 +257,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
|||||||
ents[i] = e
|
ents[i] = e
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Save(raftpb.HardState{
|
commit, term := uint64(len(ents)), uint64(1)
|
||||||
Term: 1,
|
|
||||||
|
if err := w.Save(raftpb.HardState{
|
||||||
|
Term: term,
|
||||||
Vote: peers[0].ID,
|
Vote: peers[0].ID,
|
||||||
Commit: uint64(len(ents))}, ents)
|
Commit: commit}, ents); err != nil {
|
||||||
|
ExitWithError(ExitIO, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, berr := st.Save()
|
||||||
|
if berr != nil {
|
||||||
|
ExitWithError(ExitError, berr)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftSnap := raftpb.Snapshot{
|
||||||
|
Data: b,
|
||||||
|
Metadata: raftpb.SnapshotMetadata{
|
||||||
|
Index: commit,
|
||||||
|
Term: term,
|
||||||
|
ConfState: raftpb.ConfState{
|
||||||
|
Nodes: nodeIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
snapshotter := snap.New(snapdir)
|
||||||
|
if err := snapshotter.SaveSnap(raftSnap); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
|
||||||
|
ExitWithError(ExitIO, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
||||||
// the new raft instance by waiting for a future raft index.
|
// the new raft instance by waiting for a future raft index.
|
||||||
type initIndex struct{}
|
type initIndex int
|
||||||
|
|
||||||
func (*initIndex) ConsistentIndex() uint64 { return 1 }
|
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
|
||||||
|
|
||||||
// makeDB copies the database snapshot to the snapshot directory
|
// makeDB copies the database snapshot to the snapshot directory
|
||||||
func makeDB(snapdir, dbfile string) {
|
func makeDB(snapdir, dbfile string, commit int) {
|
||||||
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
||||||
if ferr != nil {
|
if ferr != nil {
|
||||||
ExitWithError(ExitInvalidInput, ferr)
|
ExitWithError(ExitInvalidInput, ferr)
|
||||||
@ -329,7 +369,7 @@ func makeDB(snapdir, dbfile string) {
|
|||||||
// update consistentIndex so applies go through on etcdserver despite
|
// update consistentIndex so applies go through on etcdserver despite
|
||||||
// having a new raft instance
|
// having a new raft instance
|
||||||
be := backend.NewDefaultBackend(dbpath)
|
be := backend.NewDefaultBackend(dbpath)
|
||||||
s := mvcc.NewStore(be, nil, &initIndex{})
|
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
|
||||||
id := s.TxnBegin()
|
id := s.TxnBegin()
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
del := func(k, v []byte) error {
|
del := func(k, v []byte) error {
|
||||||
@ -339,6 +379,7 @@ func makeDB(snapdir, dbfile string) {
|
|||||||
|
|
||||||
// delete stored members from old cluster since using new members
|
// delete stored members from old cluster since using new members
|
||||||
btx.UnsafeForEach([]byte("members"), del)
|
btx.UnsafeForEach([]byte("members"), del)
|
||||||
|
// todo: add back new members when we start to deprecate old snap file.
|
||||||
btx.UnsafeForEach([]byte("members_removed"), del)
|
btx.UnsafeForEach([]byte("members_removed"), del)
|
||||||
// trigger write-out of new consistent index
|
// trigger write-out of new consistent index
|
||||||
s.TxnEnd(id)
|
s.TxnEnd(id)
|
||||||
|
@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
cmp, err := parseCompare(line)
|
cmp, err := parseCompare(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
op, err := parseRequestUnion(line)
|
op, err := parseRequestUnion(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
|
@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func argify(s string) []string {
|
func argify(s string) []string {
|
||||||
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
|
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
|
||||||
return r.FindAllString(s, -1)
|
args := r.FindAllString(s, -1)
|
||||||
|
for i := range args {
|
||||||
|
if len(args[i]) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if args[i][0] == '\'' {
|
||||||
|
// 'single-quoted string'
|
||||||
|
args[i] = args[i][1 : len(args)-1]
|
||||||
|
} else if args[i][0] == '"' {
|
||||||
|
// "double quoted string"
|
||||||
|
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
|
||||||
|
ExitWithError(ExitInvalidInput, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
||||||
|
@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
watchInteractiveFunc(cmd, args)
|
watchInteractiveFunc(cmd, args)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(args) < 1 || len(args) > 2 {
|
|
||||||
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
|
||||||
key := args[0]
|
|
||||||
if len(args) == 2 {
|
|
||||||
if watchPrefix {
|
|
||||||
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
|
|
||||||
}
|
|
||||||
opts = append(opts, clientv3.WithRange(args[1]))
|
|
||||||
}
|
|
||||||
|
|
||||||
if watchPrefix {
|
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
|
||||||
}
|
|
||||||
c := mustClientFromCmd(cmd)
|
c := mustClientFromCmd(cmd)
|
||||||
wc := c.Watch(context.TODO(), key, opts...)
|
wc, err := getWatchChan(c, args)
|
||||||
printWatchCh(wc)
|
if err != nil {
|
||||||
err := c.Close()
|
ExitWithError(ExitBadArgs, err)
|
||||||
if err == nil {
|
|
||||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
|
||||||
}
|
}
|
||||||
ExitWithError(ExitBadConnection, err)
|
|
||||||
|
printWatchCh(wc)
|
||||||
|
if err = c.Close(); err != nil {
|
||||||
|
ExitWithError(ExitBadConnection, err)
|
||||||
|
}
|
||||||
|
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||||
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
moreargs := flagset.Args()
|
ch, err := getWatchChan(c, flagset.Args())
|
||||||
if len(moreargs) < 1 || len(moreargs) > 2 {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
|
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var key string
|
|
||||||
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
|
|
||||||
if err != nil {
|
|
||||||
key = moreargs[0]
|
|
||||||
}
|
|
||||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
|
||||||
if len(moreargs) == 2 {
|
|
||||||
if watchPrefix {
|
|
||||||
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
opts = append(opts, clientv3.WithRange(moreargs[1]))
|
|
||||||
}
|
|
||||||
if watchPrefix {
|
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
|
||||||
}
|
|
||||||
ch := c.Watch(context.TODO(), key, opts...)
|
|
||||||
go printWatchCh(ch)
|
go printWatchCh(ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
|
||||||
|
if len(args) < 1 || len(args) > 2 {
|
||||||
|
return nil, fmt.Errorf("bad number of arguments")
|
||||||
|
}
|
||||||
|
key := args[0]
|
||||||
|
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||||
|
if len(args) == 2 {
|
||||||
|
if watchPrefix {
|
||||||
|
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
|
||||||
|
}
|
||||||
|
opts = append(opts, clientv3.WithRange(args[1]))
|
||||||
|
}
|
||||||
|
if watchPrefix {
|
||||||
|
opts = append(opts, clientv3.WithPrefix())
|
||||||
|
}
|
||||||
|
return c.Watch(context.TODO(), key, opts...), nil
|
||||||
|
}
|
||||||
|
|
||||||
func printWatchCh(ch clientv3.WatchChan) {
|
func printWatchCh(ch clientv3.WatchChan) {
|
||||||
for resp := range ch {
|
for resp := range ch {
|
||||||
display.Watch(resp)
|
display.Watch(resp)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
||||||
|
if err := checkBindURLs(cfg.lpurls); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := checkBindURLs(cfg.lcurls); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
||||||
// TODO(yichengq): check this for joining through discovery service case
|
// TODO(yichengq): check this for joining through discovery service case
|
||||||
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||||
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
|
|||||||
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
||||||
|
|
||||||
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||||
|
|
||||||
|
// checkBindURLs returns an error if any URL uses a domain name.
|
||||||
|
// TODO: return error in 3.2.0
|
||||||
|
func checkBindURLs(urls []url.URL) error {
|
||||||
|
for _, url := range urls {
|
||||||
|
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
host, _, err := net.SplitHostPort(url.Host)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "localhost" {
|
||||||
|
// special case for local address
|
||||||
|
// TODO: support /etc/hosts ?
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if net.ParseIP(host) == nil {
|
||||||
|
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||||
|
plog.Warning(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -405,14 +405,24 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
srv.be = be
|
srv.be = be
|
||||||
srv.lessor = lease.NewLessor(srv.be)
|
srv.lessor = lease.NewLessor(srv.be)
|
||||||
|
|
||||||
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
|
srv.lessor = lease.NewLessor(srv.be)
|
||||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||||
if beExist {
|
if beExist {
|
||||||
kvindex := srv.kv.ConsistentIndex()
|
kvindex := srv.kv.ConsistentIndex()
|
||||||
|
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||||
|
// etcd from pre-3.0 release.
|
||||||
if snapshot != nil && kvindex < snapshot.Metadata.Index {
|
if snapshot != nil && kvindex < snapshot.Metadata.Index {
|
||||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
|
if kvindex != 0 {
|
||||||
|
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
|
||||||
|
}
|
||||||
|
plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||||
|
|
||||||
srv.authStore = auth.NewAuthStore(srv.be)
|
srv.authStore = auth.NewAuthStore(srv.be)
|
||||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||||
@ -658,6 +668,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
|
|
||||||
newbe := backend.NewDefaultBackend(fn)
|
newbe := backend.NewDefaultBackend(fn)
|
||||||
|
|
||||||
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
|
if s.lessor != nil {
|
||||||
|
plog.Info("recovering lessor...")
|
||||||
|
s.lessor.Recover(newbe, s.kv)
|
||||||
|
plog.Info("finished recovering lessor")
|
||||||
|
}
|
||||||
|
|
||||||
plog.Info("restoring mvcc store...")
|
plog.Info("restoring mvcc store...")
|
||||||
|
|
||||||
if err := s.kv.Restore(newbe); err != nil {
|
if err := s.kv.Restore(newbe); err != nil {
|
||||||
@ -684,12 +702,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
s.be = newbe
|
s.be = newbe
|
||||||
s.bemu.Unlock()
|
s.bemu.Unlock()
|
||||||
|
|
||||||
if s.lessor != nil {
|
|
||||||
plog.Info("recovering lessor...")
|
|
||||||
s.lessor.Recover(newbe, s.kv)
|
|
||||||
plog.Info("finished recovering lessor")
|
|
||||||
}
|
|
||||||
|
|
||||||
plog.Info("recovering alarms...")
|
plog.Info("recovering alarms...")
|
||||||
if err := s.restoreAlarms(); err != nil {
|
if err := s.restoreAlarms(); err != nil {
|
||||||
plog.Panicf("restore alarms error: %v", err)
|
plog.Panicf("restore alarms error: %v", err)
|
||||||
|
@ -174,3 +174,28 @@ func TestElectionSessionRecampaign(t *testing.T) {
|
|||||||
t.Fatalf("expected value=%q, got response %v", "def", resp)
|
t.Fatalf("expected value=%q, got response %v", "def", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestElectionOnPrefixOfExistingKey checks that a single
|
||||||
|
// candidate can be elected on a new key that is a prefix
|
||||||
|
// of an existing key. To wit, check for regression
|
||||||
|
// of bug #6278. https://github.com/coreos/etcd/issues/6278
|
||||||
|
//
|
||||||
|
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
cli := clus.RandClient()
|
||||||
|
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := concurrency.NewElection(cli, "test")
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
|
err := e.Campaign(ctx, "abc")
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
// after 5 seconds, deadlock results in
|
||||||
|
// 'context deadline exceeded' here.
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -45,13 +45,18 @@ var (
|
|||||||
|
|
||||||
type LeaseID int64
|
type LeaseID int64
|
||||||
|
|
||||||
// RangeDeleter defines an interface with DeleteRange method.
|
// RangeDeleter defines an interface with Txn and DeleteRange method.
|
||||||
// We define this interface only for lessor to limit the number
|
// We define this interface only for lessor to limit the number
|
||||||
// of methods of mvcc.KV to what lessor actually needs.
|
// of methods of mvcc.KV to what lessor actually needs.
|
||||||
//
|
//
|
||||||
// Having a minimum interface makes testing easy.
|
// Having a minimum interface makes testing easy.
|
||||||
type RangeDeleter interface {
|
type RangeDeleter interface {
|
||||||
DeleteRange(key, end []byte) (int64, int64)
|
// TxnBegin see comments on mvcc.KV
|
||||||
|
TxnBegin() int64
|
||||||
|
// TxnEnd see comments on mvcc.KV
|
||||||
|
TxnEnd(txnID int64) error
|
||||||
|
// TxnDeleteRange see comments on mvcc.KV
|
||||||
|
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
||||||
@ -211,16 +216,30 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
// unlock before doing external work
|
// unlock before doing external work
|
||||||
le.mu.Unlock()
|
le.mu.Unlock()
|
||||||
|
|
||||||
if le.rd != nil {
|
if le.rd == nil {
|
||||||
for item := range l.itemSet {
|
return nil
|
||||||
le.rd.DeleteRange([]byte(item.Key), nil)
|
}
|
||||||
|
|
||||||
|
tid := le.rd.TxnBegin()
|
||||||
|
for item := range l.itemSet {
|
||||||
|
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
le.mu.Lock()
|
le.mu.Lock()
|
||||||
defer le.mu.Unlock()
|
defer le.mu.Unlock()
|
||||||
delete(le.leaseMap, l.ID)
|
delete(le.leaseMap, l.ID)
|
||||||
l.removeFrom(le.b)
|
// lease deletion needs to be in the same backend transaction with the
|
||||||
|
// kv deletion. Or we might end up with not executing the revoke or not
|
||||||
|
// deleting the keys if etcdserver fails in between.
|
||||||
|
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||||
|
|
||||||
|
err := le.rd.TxnEnd(tid)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -443,16 +462,7 @@ func (l Lease) persistTo(b backend.Backend) {
|
|||||||
b.BatchTx().Unlock()
|
b.BatchTx().Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l Lease) removeFrom(b backend.Backend) {
|
// refresh refreshes the expiry of the lease.
|
||||||
key := int64ToBytes(int64(l.ID))
|
|
||||||
|
|
||||||
b.BatchTx().Lock()
|
|
||||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
|
||||||
b.BatchTx().Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// refresh refreshes the expiry of the lease. It extends the expiry at least
|
|
||||||
// minLeaseTTL second.
|
|
||||||
func (l *Lease) refresh(extend time.Duration) {
|
func (l *Lease) refresh(extend time.Duration) {
|
||||||
if l.TTL < minLeaseTTL {
|
if l.TTL < minLeaseTTL {
|
||||||
l.TTL = minLeaseTTL
|
l.TTL = minLeaseTTL
|
||||||
|
@ -223,9 +223,17 @@ type fakeDeleter struct {
|
|||||||
deleted []string
|
deleted []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
|
func (fd *fakeDeleter) TxnBegin() int64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fd *fakeDeleter) TxnEnd(txnID int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
|
||||||
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
||||||
return 0, 0
|
return 0, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||||
|
@ -367,6 +367,8 @@ func (s *store) restore() error {
|
|||||||
revToBytes(revision{main: 1}, min)
|
revToBytes(revision{main: 1}, min)
|
||||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||||
|
|
||||||
|
keyToLease := make(map[string]lease.LeaseID)
|
||||||
|
|
||||||
// restore index
|
// restore index
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
@ -390,26 +392,15 @@ func (s *store) restore() error {
|
|||||||
switch {
|
switch {
|
||||||
case isTombstone(key):
|
case isTombstone(key):
|
||||||
s.kvindex.Tombstone(kv.Key, rev)
|
s.kvindex.Tombstone(kv.Key, rev)
|
||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
delete(keyToLease, string(kv.Key))
|
||||||
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
|
||||||
if err != nil && err != lease.ErrLeaseNotFound {
|
|
||||||
plog.Fatalf("unexpected Detach error %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
|
||||||
if s.le == nil {
|
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
|
||||||
panic("no lessor to attach lease")
|
keyToLease[string(kv.Key)] = lid
|
||||||
}
|
} else {
|
||||||
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
delete(keyToLease, string(kv.Key))
|
||||||
// We are walking through the kv history here. It is possible that we attached a key to
|
|
||||||
// the lease and the lease was revoked later.
|
|
||||||
// Thus attaching an old version of key to a none existing lease is possible here, and
|
|
||||||
// we should just ignore the error.
|
|
||||||
if err != nil && err != lease.ErrLeaseNotFound {
|
|
||||||
panic("unexpected Attach error")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -417,6 +408,16 @@ func (s *store) restore() error {
|
|||||||
s.currentRev = rev
|
s.currentRev = rev
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for key, lid := range keyToLease {
|
||||||
|
if s.le == nil {
|
||||||
|
panic("no lessor to attach lease")
|
||||||
|
}
|
||||||
|
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|
||||||
|
if err != nil {
|
||||||
|
plog.Errorf("unexpected Attach error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||||
scheduledCompact := int64(0)
|
scheduledCompact := int64(0)
|
||||||
if len(scheduledCompactBytes) != 0 {
|
if len(scheduledCompactBytes) != 0 {
|
||||||
@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
|
|
||||||
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("unexpected error from lease detach")
|
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
|
|||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatalf("cannot detach %v", err)
|
plog.Errorf("cannot detach %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
22
pkg/fileutil/dir_unix.go
Normal file
22
pkg/fileutil/dir_unix.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
// OpenDir opens a directory for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
46
pkg/fileutil/dir_windows.go
Normal file
46
pkg/fileutil/dir_windows.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OpenDir opens a directory in windows with write access for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) {
|
||||||
|
fd, err := openDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return os.NewFile(uintptr(fd), path), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openDir(path string) (fd syscall.Handle, err error) {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
|
||||||
|
}
|
||||||
|
pathp, err := syscall.UTF16PtrFromString(path)
|
||||||
|
if err != nil {
|
||||||
|
return syscall.InvalidHandle, err
|
||||||
|
}
|
||||||
|
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
|
||||||
|
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
|
||||||
|
createmode := uint32(syscall.OPEN_EXISTING)
|
||||||
|
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
|
||||||
|
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
|
||||||
|
}
|
@ -96,3 +96,26 @@ func Exist(name string) bool {
|
|||||||
_, err := os.Stat(name)
|
_, err := os.Stat(name)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ZeroToEnd zeros a file starting from SEEK_CUR to its SEEK_END. May temporarily
|
||||||
|
// shorten the length of the file.
|
||||||
|
func ZeroToEnd(f *os.File) error {
|
||||||
|
// TODO: support FALLOC_FL_ZERO_RANGE
|
||||||
|
off, err := f.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lenf, lerr := f.Seek(0, os.SEEK_END)
|
||||||
|
if lerr != nil {
|
||||||
|
return lerr
|
||||||
|
}
|
||||||
|
if err = f.Truncate(off); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// make sure blocks remain allocated
|
||||||
|
if err = Preallocate(f, lenf, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = f.Seek(off, os.SEEK_SET)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -118,3 +118,42 @@ func TestExist(t *testing.T) {
|
|||||||
t.Errorf("exist = %v, want false", g)
|
t.Errorf("exist = %v, want false", g)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestZeroToEnd(t *testing.T) {
|
||||||
|
f, err := ioutil.TempFile(os.TempDir(), "fileutil")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
b := make([]byte, 1024)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = 12
|
||||||
|
}
|
||||||
|
if _, err = f.Write(b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err = f.Seek(512, os.SEEK_SET); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = ZeroToEnd(f); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
off, serr := f.Seek(0, os.SEEK_CUR)
|
||||||
|
if serr != nil {
|
||||||
|
t.Fatal(serr)
|
||||||
|
}
|
||||||
|
if off != 512 {
|
||||||
|
t.Fatalf("expected offset 512, got %d", off)
|
||||||
|
}
|
||||||
|
|
||||||
|
b = make([]byte, 512)
|
||||||
|
if _, err = f.Read(b); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for i := range b {
|
||||||
|
if b[i] != 0 {
|
||||||
|
t.Errorf("expected b[%d] = 0, got %d", i, b[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
103
pkg/ioutil/pagewriter.go
Normal file
103
pkg/ioutil/pagewriter.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package ioutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultBufferBytes = 128 * 1024
|
||||||
|
|
||||||
|
// PageWriter implements the io.Writer interface so that writes will
|
||||||
|
// either be in page chunks or from flushing.
|
||||||
|
type PageWriter struct {
|
||||||
|
w io.Writer
|
||||||
|
// pageOffset tracks the page offset of the base of the buffer
|
||||||
|
pageOffset int
|
||||||
|
// pageBytes is the number of bytes per page
|
||||||
|
pageBytes int
|
||||||
|
// bufferedBytes counts the number of bytes pending for write in the buffer
|
||||||
|
bufferedBytes int
|
||||||
|
// buf holds the write buffer
|
||||||
|
buf []byte
|
||||||
|
// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
|
||||||
|
// to be flushed. It is less than len(buf) so there is space for slack writes
|
||||||
|
// to bring the writer to page alignment.
|
||||||
|
bufWatermarkBytes int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
|
||||||
|
return &PageWriter{
|
||||||
|
w: w,
|
||||||
|
pageBytes: pageBytes,
|
||||||
|
buf: make([]byte, defaultBufferBytes+pageBytes),
|
||||||
|
bufWatermarkBytes: defaultBufferBytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pw *PageWriter) Write(p []byte) (n int, err error) {
|
||||||
|
if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
|
||||||
|
// no overflow
|
||||||
|
copy(pw.buf[pw.bufferedBytes:], p)
|
||||||
|
pw.bufferedBytes += len(p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
// complete the slack page in the buffer if unaligned
|
||||||
|
slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
|
||||||
|
if slack != pw.pageBytes {
|
||||||
|
partial := slack > len(p)
|
||||||
|
if partial {
|
||||||
|
// not enough data to complete the slack page
|
||||||
|
slack = len(p)
|
||||||
|
}
|
||||||
|
// special case: writing to slack page in buffer
|
||||||
|
copy(pw.buf[pw.bufferedBytes:], p[:slack])
|
||||||
|
pw.bufferedBytes += slack
|
||||||
|
n = slack
|
||||||
|
p = p[slack:]
|
||||||
|
if partial {
|
||||||
|
// avoid forcing an unaligned flush
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// buffer contents are now page-aligned; clear out
|
||||||
|
if err = pw.Flush(); err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
// directly write all complete pages without copying
|
||||||
|
if len(p) > pw.pageBytes {
|
||||||
|
pages := len(p) / pw.pageBytes
|
||||||
|
c, werr := pw.w.Write(p[:pages*pw.pageBytes])
|
||||||
|
n += c
|
||||||
|
if werr != nil {
|
||||||
|
return n, werr
|
||||||
|
}
|
||||||
|
p = p[pages*pw.pageBytes:]
|
||||||
|
}
|
||||||
|
// write remaining tail to buffer
|
||||||
|
c, werr := pw.Write(p)
|
||||||
|
n += c
|
||||||
|
return n, werr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pw *PageWriter) Flush() error {
|
||||||
|
if pw.bufferedBytes == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
|
||||||
|
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
|
||||||
|
pw.bufferedBytes = 0
|
||||||
|
return err
|
||||||
|
}
|
100
pkg/ioutil/pagewriter_test.go
Normal file
100
pkg/ioutil/pagewriter_test.go
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package ioutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPageWriterRandom(t *testing.T) {
|
||||||
|
// smaller buffer for stress testing
|
||||||
|
defaultBufferBytes = 8 * 1024
|
||||||
|
pageBytes := 128
|
||||||
|
buf := make([]byte, 4*defaultBufferBytes)
|
||||||
|
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
|
||||||
|
w := NewPageWriter(cw, pageBytes)
|
||||||
|
n := 0
|
||||||
|
for i := 0; i < 4096; i++ {
|
||||||
|
c, err := w.Write(buf[:rand.Intn(len(buf))])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
n += c
|
||||||
|
}
|
||||||
|
if cw.writeBytes > n {
|
||||||
|
t.Fatalf("wrote %d bytes to io.Writer, but only wrote %d bytes", cw.writeBytes, n)
|
||||||
|
}
|
||||||
|
if cw.writeBytes-n > pageBytes {
|
||||||
|
t.Fatalf("got %d bytes pending, expected less than %d bytes", cw.writeBytes-n, pageBytes)
|
||||||
|
}
|
||||||
|
t.Logf("total writes: %d", cw.writes)
|
||||||
|
t.Logf("total write bytes: %d (of %d)", cw.writeBytes, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPageWriterPariallack tests the case where a write overflows the buffer
|
||||||
|
// but there is not enough data to complete the slack write.
|
||||||
|
func TestPageWriterPartialSlack(t *testing.T) {
|
||||||
|
defaultBufferBytes = 1024
|
||||||
|
pageBytes := 128
|
||||||
|
buf := make([]byte, defaultBufferBytes)
|
||||||
|
cw := &checkPageWriter{pageBytes: 64, t: t}
|
||||||
|
w := NewPageWriter(cw, pageBytes)
|
||||||
|
// put writer in non-zero page offset
|
||||||
|
if _, err := w.Write(buf[:64]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if cw.writes != 1 {
|
||||||
|
t.Fatalf("got %d writes, expected 1", cw.writes)
|
||||||
|
}
|
||||||
|
// nearly fill buffer
|
||||||
|
if _, err := w.Write(buf[:1022]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// overflow buffer, but without enough to write as aligned
|
||||||
|
if _, err := w.Write(buf[:8]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if cw.writes != 1 {
|
||||||
|
t.Fatalf("got %d writes, expected 1", cw.writes)
|
||||||
|
}
|
||||||
|
// finish writing slack space
|
||||||
|
if _, err := w.Write(buf[:128]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if cw.writes != 2 {
|
||||||
|
t.Fatalf("got %d writes, expected 2", cw.writes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
|
||||||
|
type checkPageWriter struct {
|
||||||
|
pageBytes int
|
||||||
|
writes int
|
||||||
|
writeBytes int
|
||||||
|
t *testing.T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *checkPageWriter) Write(p []byte) (int, error) {
|
||||||
|
if len(p)%cw.pageBytes != 0 {
|
||||||
|
cw.t.Fatalf("got write len(p) = %d, expected len(p) == k*cw.pageBytes", len(p))
|
||||||
|
}
|
||||||
|
cw.writes++
|
||||||
|
cw.writeBytes += len(p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|||||||
default:
|
default:
|
||||||
plog.Panicf("unhandled stream type %s", t)
|
plog.Panicf("unhandled stream type %s", t)
|
||||||
}
|
}
|
||||||
cr.closer = rc
|
select {
|
||||||
|
case <-cr.stopc:
|
||||||
|
cr.mu.Unlock()
|
||||||
|
if err := rc.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return io.EOF
|
||||||
|
default:
|
||||||
|
cr.closer = rc
|
||||||
|
}
|
||||||
cr.mu.Unlock()
|
cr.mu.Unlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -17,6 +17,7 @@ package rafthttp
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -180,6 +181,60 @@ func TestStreamReaderDialResult(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
|
||||||
|
func TestStreamReaderStopOnDial(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
h := http.Header{}
|
||||||
|
h.Add("X-Server-Version", version.Version)
|
||||||
|
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
|
||||||
|
sr := &streamReader{
|
||||||
|
peerID: types.ID(2),
|
||||||
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
||||||
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||||
|
errorc: make(chan error, 1),
|
||||||
|
typ: streamTypeMessage,
|
||||||
|
status: newPeerStatus(types.ID(2)),
|
||||||
|
}
|
||||||
|
tr.onResp = func() {
|
||||||
|
// stop() waits for the run() goroutine to exit, but that exit
|
||||||
|
// needs a response from RoundTrip() first; use goroutine
|
||||||
|
go sr.stop()
|
||||||
|
// wait so that stop() is blocked on run() exiting
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
// sr.run() completes dialing then begins decoding while stopped
|
||||||
|
}
|
||||||
|
sr.start()
|
||||||
|
select {
|
||||||
|
case <-sr.done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("streamReader did not stop in time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type respWaitRoundTripper struct {
|
||||||
|
rrt *respRoundTripper
|
||||||
|
onResp func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
resp, err := t.rrt.RoundTrip(req)
|
||||||
|
resp.Body = newWaitReadCloser()
|
||||||
|
t.onResp()
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type waitReadCloser struct{ closec chan struct{} }
|
||||||
|
|
||||||
|
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
|
||||||
|
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
|
||||||
|
<-wrc.closec
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
func (wrc *waitReadCloser) Close() error {
|
||||||
|
close(wrc.closec)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
||||||
// out that the stream type is not supported by the remote.
|
// out that the stream type is not supported by the remote.
|
||||||
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.5"
|
Version = "3.0.9"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
@ -15,19 +15,24 @@
|
|||||||
package wal
|
package wal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"hash"
|
"hash"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/crc"
|
"github.com/coreos/etcd/pkg/crc"
|
||||||
|
"github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/wal/walpb"
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// walPageBytes is the alignment for flushing records to the backing Writer.
|
||||||
|
// It should be a multiple of the minimum sector size so that WAL repair can
|
||||||
|
// safely between torn writes and ordinary data corruption.
|
||||||
|
const walPageBytes = 8 * minSectorSize
|
||||||
|
|
||||||
type encoder struct {
|
type encoder struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
bw *bufio.Writer
|
bw *ioutil.PageWriter
|
||||||
|
|
||||||
crc hash.Hash32
|
crc hash.Hash32
|
||||||
buf []byte
|
buf []byte
|
||||||
@ -36,7 +41,7 @@ type encoder struct {
|
|||||||
|
|
||||||
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
|
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
|
||||||
return &encoder{
|
return &encoder{
|
||||||
bw: bufio.NewWriter(w),
|
bw: ioutil.NewPageWriter(w, walPageBytes),
|
||||||
crc: crc.New(prevCrc, crcTable),
|
crc: crc.New(prevCrc, crcTable),
|
||||||
// 1MB buffer
|
// 1MB buffer
|
||||||
buf: make([]byte, 1024*1024),
|
buf: make([]byte, 1024*1024),
|
||||||
|
69
wal/wal.go
69
wal/wal.go
@ -67,7 +67,11 @@ var (
|
|||||||
// A just opened WAL is in read mode, and ready for reading records.
|
// A just opened WAL is in read mode, and ready for reading records.
|
||||||
// The WAL will be ready for appending after reading out all the previous records.
|
// The WAL will be ready for appending after reading out all the previous records.
|
||||||
type WAL struct {
|
type WAL struct {
|
||||||
dir string // the living directory of the underlay files
|
dir string // the living directory of the underlay files
|
||||||
|
|
||||||
|
// dirFile is a fd for the wal directory for syncing on Rename
|
||||||
|
dirFile *os.File
|
||||||
|
|
||||||
metadata []byte // metadata recorded at the head of each WAL
|
metadata []byte // metadata recorded at the head of each WAL
|
||||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||||
|
|
||||||
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if _, err := f.Seek(0, os.SEEK_END); err != nil {
|
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,32 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
encoder: newEncoder(f, 0),
|
encoder: newEncoder(f, 0),
|
||||||
}
|
}
|
||||||
w.locks = append(w.locks, f)
|
w.locks = append(w.locks, f)
|
||||||
if err := w.saveCrc(0); err != nil {
|
if err = w.saveCrc(0); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// rename of directory with locked files doesn't work on windows; close
|
if w, err = w.renameWal(tmpdirpath); err != nil {
|
||||||
// the WAL to release the locks so the directory can be renamed
|
|
||||||
w.Close()
|
|
||||||
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// reopen and relock
|
|
||||||
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
|
// directory was renamed; sync parent dir to persist rename
|
||||||
if oerr != nil {
|
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
||||||
return nil, oerr
|
if perr != nil {
|
||||||
|
return nil, perr
|
||||||
}
|
}
|
||||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||||
newWAL.Close()
|
return nil, perr
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
return newWAL, nil
|
if perr = pdir.Close(); err != nil {
|
||||||
|
return nil, perr
|
||||||
|
}
|
||||||
|
|
||||||
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the WAL at the given snap.
|
// Open opens the WAL at the given snap.
|
||||||
@ -154,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||||
// previous records.
|
// previous records.
|
||||||
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
return openAtIndex(dirpath, snap, true)
|
w, err := openAtIndex(dirpath, snap, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenForRead only opens the wal files for read.
|
// OpenForRead only opens the wal files for read.
|
||||||
@ -299,6 +311,18 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
state.Reset()
|
state.Reset()
|
||||||
return nil, state, nil, err
|
return nil, state, nil, err
|
||||||
}
|
}
|
||||||
|
// decodeRecord() will return io.EOF if it detects a zero record,
|
||||||
|
// but this zero record may be followed by non-zero records from
|
||||||
|
// a torn write. Overwriting some of these non-zero records, but
|
||||||
|
// not all, will cause CRC errors on WAL open. Since the records
|
||||||
|
// were never fully synced to disk in the first place, it's safe
|
||||||
|
// to zero them out to avoid any CRC errors from new writes.
|
||||||
|
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
|
||||||
|
return nil, state, nil, err
|
||||||
|
}
|
||||||
|
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
|
||||||
|
return nil, state, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nil
|
err = nil
|
||||||
@ -317,7 +341,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
|
|
||||||
if w.tail() != nil {
|
if w.tail() != nil {
|
||||||
// create encoder (chain crc with the decoder), enable appending
|
// create encoder (chain crc with the decoder), enable appending
|
||||||
_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
|
|
||||||
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
||||||
}
|
}
|
||||||
w.decoder = nil
|
w.decoder = nil
|
||||||
@ -375,6 +398,10 @@ func (w *WAL) cut() error {
|
|||||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
newTail.Close()
|
newTail.Close()
|
||||||
|
|
||||||
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||||
@ -477,7 +504,7 @@ func (w *WAL) Close() error {
|
|||||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return w.dirFile.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||||
|
@ -636,3 +636,89 @@ func TestRestartCreateWal(t *testing.T) {
|
|||||||
t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
|
t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestOpenOnTornWrite ensures that entries past the torn write are truncated.
|
||||||
|
func TestOpenOnTornWrite(t *testing.T) {
|
||||||
|
maxEntries := 40
|
||||||
|
clobberIdx := 20
|
||||||
|
overwriteEntries := 5
|
||||||
|
|
||||||
|
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(p)
|
||||||
|
w, err := Create(p, nil)
|
||||||
|
defer w.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get offset of end of each saved entry
|
||||||
|
offsets := make([]int64, maxEntries)
|
||||||
|
for i := range offsets {
|
||||||
|
es := []raftpb.Entry{{Index: uint64(i)}}
|
||||||
|
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if offsets[i], err = w.tail().Seek(0, os.SEEK_CUR); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := path.Join(p, path.Base(w.tail().Name()))
|
||||||
|
w.Close()
|
||||||
|
|
||||||
|
// clobber some entry with 0's to simulate a torn write
|
||||||
|
f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
|
||||||
|
if ferr != nil {
|
||||||
|
t.Fatal(ferr)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
_, err = f.Seek(offsets[clobberIdx], os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
|
||||||
|
_, err = f.Write(zeros)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
w, err = Open(p, walpb.Snapshot{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// seek up to clobbered entry
|
||||||
|
_, _, _, err = w.ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// write a few entries past the clobbered entry
|
||||||
|
for i := 0; i < overwriteEntries; i++ {
|
||||||
|
// Index is different from old, truncated entries
|
||||||
|
es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
|
||||||
|
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
|
||||||
|
// read back the entries, confirm number of entries matches expectation
|
||||||
|
w, err = OpenForRead(p, walpb.Snapshot{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, ents, rerr := w.ReadAll()
|
||||||
|
if rerr != nil {
|
||||||
|
// CRC error? the old entries were likely never truncated away
|
||||||
|
t.Fatal(rerr)
|
||||||
|
}
|
||||||
|
wEntries := (clobberIdx - 1) + overwriteEntries
|
||||||
|
if len(ents) != wEntries {
|
||||||
|
t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
44
wal/wal_unix.go
Normal file
44
wal/wal_unix.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package wal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||||
|
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||||
|
// the lock and trying to reacquire it quickly can be flaky because
|
||||||
|
// it's possible the process will fork to spawn a process while this is
|
||||||
|
// happening. The fds are set up as close-on-exec by the Go runtime,
|
||||||
|
// but there is a window between the fork and the exec where another
|
||||||
|
// process holds the lock.
|
||||||
|
|
||||||
|
if err := os.RemoveAll(w.dir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
||||||
|
df, err := fileutil.OpenDir(w.dir)
|
||||||
|
w.dirFile = df
|
||||||
|
return w, err
|
||||||
|
}
|
41
wal/wal_windows.go
Normal file
41
wal/wal_windows.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package wal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||||
|
// rename of directory with locked files doesn't work on
|
||||||
|
// windows; close the WAL to release the locks so the directory
|
||||||
|
// can be renamed
|
||||||
|
w.Close()
|
||||||
|
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// reopen and relock
|
||||||
|
newWAL, oerr := Open(w.dir, walpb.Snapshot{})
|
||||||
|
if oerr != nil {
|
||||||
|
return nil, oerr
|
||||||
|
}
|
||||||
|
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||||
|
newWAL.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newWAL, nil
|
||||||
|
}
|
Reference in New Issue
Block a user