Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
9efa00d103 | |||
72d30f4c34 | |||
2e92779777 | |||
404415b1e3 | |||
07e421d245 | |||
a7d6e29275 | |||
1a8b295dab | |||
ffc45cc066 | |||
0db1ba8093 |
36
cmd/Godeps/Godeps.json
generated
36
cmd/Godeps/Godeps.json
generated
@ -237,48 +237,48 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc",
|
"ImportPath": "google.golang.org/grpc",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/codes",
|
"ImportPath": "google.golang.org/grpc/codes",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/credentials",
|
"ImportPath": "google.golang.org/grpc/credentials",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/grpclog",
|
"ImportPath": "google.golang.org/grpc/grpclog",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/internal",
|
"ImportPath": "google.golang.org/grpc/internal",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/metadata",
|
"ImportPath": "google.golang.org/grpc/metadata",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/naming",
|
"ImportPath": "google.golang.org/grpc/naming",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/peer",
|
"ImportPath": "google.golang.org/grpc/peer",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "google.golang.org/grpc/transport",
|
"ImportPath": "google.golang.org/grpc/transport",
|
||||||
"Comment": "v1.0.0-174-gc278196",
|
"Comment": "v1.0.0-183-g231b4cf",
|
||||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "gopkg.in/cheggaaa/pb.v1",
|
"ImportPath": "gopkg.in/cheggaaa/pb.v1",
|
||||||
|
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
@ -170,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
if _, ok := err.(*rpcError); ok {
|
if _, ok := err.(*rpcError); ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err == errConnClosing {
|
if err == errConnClosing || err == errConnUnavailable {
|
||||||
if c.failFast {
|
if c.failFast {
|
||||||
return Errorf(codes.Unavailable, "%v", errConnClosing)
|
return Errorf(codes.Unavailable, "%v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -73,7 +73,9 @@ var (
|
|||||||
errConnDrain = errors.New("grpc: the connection is drained")
|
errConnDrain = errors.New("grpc: the connection is drained")
|
||||||
// errConnClosing indicates that the connection is closing.
|
// errConnClosing indicates that the connection is closing.
|
||||||
errConnClosing = errors.New("grpc: the connection is closing")
|
errConnClosing = errors.New("grpc: the connection is closing")
|
||||||
errNoAddr = errors.New("grpc: there is no address available to dial")
|
// errConnUnavailable indicates that the connection is unavailable.
|
||||||
|
errConnUnavailable = errors.New("grpc: the connection is unavailable")
|
||||||
|
errNoAddr = errors.New("grpc: there is no address available to dial")
|
||||||
// minimum time to give a connection to complete
|
// minimum time to give a connection to complete
|
||||||
minConnectTimeout = 20 * time.Second
|
minConnectTimeout = 20 * time.Second
|
||||||
)
|
)
|
||||||
@ -213,9 +215,14 @@ func WithUserAgent(s string) DialOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial creates a client connection the given target.
|
// Dial creates a client connection to the given target.
|
||||||
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
||||||
ctx := context.Background()
|
return DialContext(context.Background(), target, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DialContext creates a client connection to the given target
|
||||||
|
// using the supplied context.
|
||||||
|
func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
|
||||||
cc := &ClientConn{
|
cc := &ClientConn{
|
||||||
target: target,
|
target: target,
|
||||||
conns: make(map[Address]*addrConn),
|
conns: make(map[Address]*addrConn),
|
||||||
@ -472,6 +479,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
if cc.dopts.balancer == nil {
|
if cc.dopts.balancer == nil {
|
||||||
// If balancer is nil, there should be only one addrConn available.
|
// If balancer is nil, there should be only one addrConn available.
|
||||||
cc.mu.RLock()
|
cc.mu.RLock()
|
||||||
|
if cc.conns == nil {
|
||||||
|
cc.mu.RUnlock()
|
||||||
|
return nil, nil, toRPCErr(ErrClientConnClosing)
|
||||||
|
}
|
||||||
for _, ac = range cc.conns {
|
for _, ac = range cc.conns {
|
||||||
// Break after the first iteration to get the first addrConn.
|
// Break after the first iteration to get the first addrConn.
|
||||||
ok = true
|
ok = true
|
||||||
@ -501,11 +512,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
}
|
}
|
||||||
return nil, nil, errConnClosing
|
return nil, nil, errConnClosing
|
||||||
}
|
}
|
||||||
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
|
||||||
// - If RPC is failfast, ac.wait should not block.
|
|
||||||
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
|
||||||
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
|
||||||
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if put != nil {
|
if put != nil {
|
||||||
put()
|
put()
|
||||||
@ -757,36 +764,42 @@ func (ac *addrConn) transportMonitor() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||||
// iv) transport is in TransientFailure and blocking is false.
|
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
|
||||||
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
|
||||||
for {
|
for {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
switch {
|
switch {
|
||||||
case ac.state == Shutdown:
|
case ac.state == Shutdown:
|
||||||
err := ac.tearDownErr
|
if failfast || !hasBalancer {
|
||||||
|
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
|
||||||
|
err := ac.tearDownErr
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil, err
|
return nil, errConnClosing
|
||||||
case ac.state == Ready:
|
case ac.state == Ready:
|
||||||
ct := ac.transport
|
ct := ac.transport
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return ct, nil
|
return ct, nil
|
||||||
case ac.state == TransientFailure && !blocking:
|
case ac.state == TransientFailure:
|
||||||
ac.mu.Unlock()
|
if failfast || hasBalancer {
|
||||||
return nil, errConnClosing
|
ac.mu.Unlock()
|
||||||
default:
|
return nil, errConnUnavailable
|
||||||
ready := ac.ready
|
|
||||||
if ready == nil {
|
|
||||||
ready = make(chan struct{})
|
|
||||||
ac.ready = ready
|
|
||||||
}
|
|
||||||
ac.mu.Unlock()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, toRPCErr(ctx.Err())
|
|
||||||
// Wait until the new transport is ready or failed.
|
|
||||||
case <-ready:
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ready := ac.ready
|
||||||
|
if ready == nil {
|
||||||
|
ready = make(chan struct{})
|
||||||
|
ac.ready = ready
|
||||||
|
}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, toRPCErr(ctx.Err())
|
||||||
|
// Wait until the new transport is ready or failed.
|
||||||
|
case <-ready:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -146,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
if _, ok := err.(*rpcError); ok {
|
if _, ok := err.(*rpcError); ok {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err == errConnClosing {
|
if err == errConnClosing || err == errConnUnavailable {
|
||||||
if c.failFast {
|
if c.failFast {
|
||||||
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
|
return nil, Errorf(codes.Unavailable, "%v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -405,6 +405,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
srv.be = be
|
srv.be = be
|
||||||
srv.lessor = lease.NewLessor(srv.be)
|
srv.lessor = lease.NewLessor(srv.be)
|
||||||
|
|
||||||
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
|
srv.lessor = lease.NewLessor(srv.be)
|
||||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||||
if beExist {
|
if beExist {
|
||||||
kvindex := srv.kv.ConsistentIndex()
|
kvindex := srv.kv.ConsistentIndex()
|
||||||
@ -413,6 +417,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||||
|
|
||||||
srv.authStore = auth.NewAuthStore(srv.be)
|
srv.authStore = auth.NewAuthStore(srv.be)
|
||||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||||
@ -658,6 +663,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
|
|
||||||
newbe := backend.NewDefaultBackend(fn)
|
newbe := backend.NewDefaultBackend(fn)
|
||||||
|
|
||||||
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
|
if s.lessor != nil {
|
||||||
|
plog.Info("recovering lessor...")
|
||||||
|
s.lessor.Recover(newbe, s.kv)
|
||||||
|
plog.Info("finished recovering lessor")
|
||||||
|
}
|
||||||
|
|
||||||
plog.Info("restoring mvcc store...")
|
plog.Info("restoring mvcc store...")
|
||||||
|
|
||||||
if err := s.kv.Restore(newbe); err != nil {
|
if err := s.kv.Restore(newbe); err != nil {
|
||||||
@ -684,12 +697,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
s.be = newbe
|
s.be = newbe
|
||||||
s.bemu.Unlock()
|
s.bemu.Unlock()
|
||||||
|
|
||||||
if s.lessor != nil {
|
|
||||||
plog.Info("recovering lessor...")
|
|
||||||
s.lessor.Recover(newbe, s.kv)
|
|
||||||
plog.Info("finished recovering lessor")
|
|
||||||
}
|
|
||||||
|
|
||||||
plog.Info("recovering alarms...")
|
plog.Info("recovering alarms...")
|
||||||
if err := s.restoreAlarms(); err != nil {
|
if err := s.restoreAlarms(); err != nil {
|
||||||
plog.Panicf("restore alarms error: %v", err)
|
plog.Panicf("restore alarms error: %v", err)
|
||||||
|
@ -45,13 +45,18 @@ var (
|
|||||||
|
|
||||||
type LeaseID int64
|
type LeaseID int64
|
||||||
|
|
||||||
// RangeDeleter defines an interface with DeleteRange method.
|
// RangeDeleter defines an interface with Txn and DeleteRange method.
|
||||||
// We define this interface only for lessor to limit the number
|
// We define this interface only for lessor to limit the number
|
||||||
// of methods of mvcc.KV to what lessor actually needs.
|
// of methods of mvcc.KV to what lessor actually needs.
|
||||||
//
|
//
|
||||||
// Having a minimum interface makes testing easy.
|
// Having a minimum interface makes testing easy.
|
||||||
type RangeDeleter interface {
|
type RangeDeleter interface {
|
||||||
DeleteRange(key, end []byte) (int64, int64)
|
// TxnBegin see comments on mvcc.KV
|
||||||
|
TxnBegin() int64
|
||||||
|
// TxnEnd see comments on mvcc.KV
|
||||||
|
TxnEnd(txnID int64) error
|
||||||
|
// TxnDeleteRange see comments on mvcc.KV
|
||||||
|
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
||||||
@ -211,16 +216,30 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
// unlock before doing external work
|
// unlock before doing external work
|
||||||
le.mu.Unlock()
|
le.mu.Unlock()
|
||||||
|
|
||||||
if le.rd != nil {
|
if le.rd == nil {
|
||||||
for item := range l.itemSet {
|
return nil
|
||||||
le.rd.DeleteRange([]byte(item.Key), nil)
|
}
|
||||||
|
|
||||||
|
tid := le.rd.TxnBegin()
|
||||||
|
for item := range l.itemSet {
|
||||||
|
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
le.mu.Lock()
|
le.mu.Lock()
|
||||||
defer le.mu.Unlock()
|
defer le.mu.Unlock()
|
||||||
delete(le.leaseMap, l.ID)
|
delete(le.leaseMap, l.ID)
|
||||||
l.removeFrom(le.b)
|
// lease deletion needs to be in the same backend transaction with the
|
||||||
|
// kv deletion. Or we might end up with not executing the revoke or not
|
||||||
|
// deleting the keys if etcdserver fails in between.
|
||||||
|
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||||
|
|
||||||
|
err := le.rd.TxnEnd(tid)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -443,16 +462,7 @@ func (l Lease) persistTo(b backend.Backend) {
|
|||||||
b.BatchTx().Unlock()
|
b.BatchTx().Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l Lease) removeFrom(b backend.Backend) {
|
// refresh refreshes the expiry of the lease.
|
||||||
key := int64ToBytes(int64(l.ID))
|
|
||||||
|
|
||||||
b.BatchTx().Lock()
|
|
||||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
|
||||||
b.BatchTx().Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// refresh refreshes the expiry of the lease. It extends the expiry at least
|
|
||||||
// minLeaseTTL second.
|
|
||||||
func (l *Lease) refresh(extend time.Duration) {
|
func (l *Lease) refresh(extend time.Duration) {
|
||||||
if l.TTL < minLeaseTTL {
|
if l.TTL < minLeaseTTL {
|
||||||
l.TTL = minLeaseTTL
|
l.TTL = minLeaseTTL
|
||||||
|
@ -223,9 +223,17 @@ type fakeDeleter struct {
|
|||||||
deleted []string
|
deleted []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
|
func (fd *fakeDeleter) TxnBegin() int64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fd *fakeDeleter) TxnEnd(txnID int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
|
||||||
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
||||||
return 0, 0
|
return 0, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||||
|
@ -367,6 +367,8 @@ func (s *store) restore() error {
|
|||||||
revToBytes(revision{main: 1}, min)
|
revToBytes(revision{main: 1}, min)
|
||||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||||
|
|
||||||
|
keyToLease := make(map[string]lease.LeaseID)
|
||||||
|
|
||||||
// restore index
|
// restore index
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
@ -390,26 +392,15 @@ func (s *store) restore() error {
|
|||||||
switch {
|
switch {
|
||||||
case isTombstone(key):
|
case isTombstone(key):
|
||||||
s.kvindex.Tombstone(kv.Key, rev)
|
s.kvindex.Tombstone(kv.Key, rev)
|
||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
delete(keyToLease, string(kv.Key))
|
||||||
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
|
||||||
if err != nil && err != lease.ErrLeaseNotFound {
|
|
||||||
plog.Fatalf("unexpected Detach error %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
|
||||||
if s.le == nil {
|
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
|
||||||
panic("no lessor to attach lease")
|
keyToLease[string(kv.Key)] = lid
|
||||||
}
|
} else {
|
||||||
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
delete(keyToLease, string(kv.Key))
|
||||||
// We are walking through the kv history here. It is possible that we attached a key to
|
|
||||||
// the lease and the lease was revoked later.
|
|
||||||
// Thus attaching an old version of key to a none existing lease is possible here, and
|
|
||||||
// we should just ignore the error.
|
|
||||||
if err != nil && err != lease.ErrLeaseNotFound {
|
|
||||||
panic("unexpected Attach error")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -417,6 +408,16 @@ func (s *store) restore() error {
|
|||||||
s.currentRev = rev
|
s.currentRev = rev
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for key, lid := range keyToLease {
|
||||||
|
if s.le == nil {
|
||||||
|
panic("no lessor to attach lease")
|
||||||
|
}
|
||||||
|
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|
||||||
|
if err != nil {
|
||||||
|
plog.Errorf("unexpected Attach error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||||
scheduledCompact := int64(0)
|
scheduledCompact := int64(0)
|
||||||
if len(scheduledCompactBytes) != 0 {
|
if len(scheduledCompactBytes) != 0 {
|
||||||
@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
|||||||
|
|
||||||
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("unexpected error from lease detach")
|
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
|
|||||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatalf("cannot detach %v", err)
|
plog.Errorf("cannot detach %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|||||||
default:
|
default:
|
||||||
plog.Panicf("unhandled stream type %s", t)
|
plog.Panicf("unhandled stream type %s", t)
|
||||||
}
|
}
|
||||||
cr.closer = rc
|
select {
|
||||||
|
case <-cr.stopc:
|
||||||
|
cr.mu.Unlock()
|
||||||
|
if err := rc.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return io.EOF
|
||||||
|
default:
|
||||||
|
cr.closer = rc
|
||||||
|
}
|
||||||
cr.mu.Unlock()
|
cr.mu.Unlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -17,6 +17,7 @@ package rafthttp
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -180,6 +181,60 @@ func TestStreamReaderDialResult(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
|
||||||
|
func TestStreamReaderStopOnDial(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
h := http.Header{}
|
||||||
|
h.Add("X-Server-Version", version.Version)
|
||||||
|
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
|
||||||
|
sr := &streamReader{
|
||||||
|
peerID: types.ID(2),
|
||||||
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
||||||
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||||
|
errorc: make(chan error, 1),
|
||||||
|
typ: streamTypeMessage,
|
||||||
|
status: newPeerStatus(types.ID(2)),
|
||||||
|
}
|
||||||
|
tr.onResp = func() {
|
||||||
|
// stop() waits for the run() goroutine to exit, but that exit
|
||||||
|
// needs a response from RoundTrip() first; use goroutine
|
||||||
|
go sr.stop()
|
||||||
|
// wait so that stop() is blocked on run() exiting
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
// sr.run() completes dialing then begins decoding while stopped
|
||||||
|
}
|
||||||
|
sr.start()
|
||||||
|
select {
|
||||||
|
case <-sr.done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("streamReader did not stop in time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type respWaitRoundTripper struct {
|
||||||
|
rrt *respRoundTripper
|
||||||
|
onResp func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
resp, err := t.rrt.RoundTrip(req)
|
||||||
|
resp.Body = newWaitReadCloser()
|
||||||
|
t.onResp()
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type waitReadCloser struct{ closec chan struct{} }
|
||||||
|
|
||||||
|
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
|
||||||
|
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
|
||||||
|
<-wrc.closec
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
func (wrc *waitReadCloser) Close() error {
|
||||||
|
close(wrc.closec)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
||||||
// out that the stream type is not supported by the remote.
|
// out that the stream type is not supported by the remote.
|
||||||
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.5"
|
Version = "3.0.6"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
Reference in New Issue
Block a user