Compare commits

...

9 Commits

Author SHA1 Message Date
9efa00d103 version: bump to v3.0.6 2016-08-19 12:03:02 -07:00
72d30f4c34 *: minor cleanup for lease 2016-08-19 11:53:38 -07:00
2e92779777 mvcc: attach keys to leases after recover all state
The previous logic is wrong. When we have hisotry like Put(foo, bar, lease1),
and Put(foo, bar, lease2), we will end up with attaching foo to two leases 1 and
2. Similar things can happen for deattach by clearing the lease of a key.

Now we try to fix this by starting to attach leases at the end of the recovery.
We use a map to keep the last lease attachment state.
2016-08-19 11:49:05 -07:00
404415b1e3 lease: do lease delection in the kv txn 2016-08-19 11:49:05 -07:00
07e421d245 lease: delete kvs in a txn 2016-08-19 11:49:05 -07:00
a7d6e29275 etcdserver: always recover lessor first 2016-08-19 11:49:05 -07:00
1a8b295dab vendor: update grpc/grpc-go for clientconn patch 2016-08-19 11:46:51 -07:00
ffc45cc066 rafthttp: fix race between streamReader.stop() and connection closer 2016-08-19 11:45:39 -07:00
0db1ba8093 version: bump to v3.0.5+git 2016-08-19 11:11:10 -07:00
11 changed files with 198 additions and 95 deletions

36
cmd/Godeps/Godeps.json generated
View File

@ -237,48 +237,48 @@
},
{
"ImportPath": "google.golang.org/grpc",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/codes",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/credentials",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/grpclog",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/internal",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/metadata",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/naming",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/peer",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "google.golang.org/grpc/transport",
"Comment": "v1.0.0-174-gc278196",
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
"Comment": "v1.0.0-183-g231b4cf",
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
},
{
"ImportPath": "gopkg.in/cheggaaa/pb.v1",

View File

@ -170,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if _, ok := err.(*rpcError); ok {
return err
}
if err == errConnClosing {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return Errorf(codes.Unavailable, "%v", errConnClosing)
return Errorf(codes.Unavailable, "%v", err)
}
continue
}

View File

@ -73,7 +73,9 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
errNoAddr = errors.New("grpc: there is no address available to dial")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -213,9 +215,14 @@ func WithUserAgent(s string) DialOption {
}
}
// Dial creates a client connection the given target.
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
ctx := context.Background()
return DialContext(context.Background(), target, opts...)
}
// DialContext creates a client connection to the given target
// using the supplied context.
func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
@ -472,6 +479,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
if cc.dopts.balancer == nil {
// If balancer is nil, there should be only one addrConn available.
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
return nil, nil, toRPCErr(ErrClientConnClosing)
}
for _, ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
ok = true
@ -501,11 +512,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
}
return nil, nil, errConnClosing
}
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
// - If RPC is failfast, ac.wait should not block.
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@ -757,36 +764,42 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and blocking is false.
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
err := ac.tearDownErr
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
ac.mu.Unlock()
return nil, err
}
ac.mu.Unlock()
return nil, err
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
case ac.state == TransientFailure && !blocking:
ac.mu.Unlock()
return nil, errConnClosing
default:
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
}
ac.mu.Unlock()
select {
case <-ctx.Done():
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
case ac.state == TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
}
}
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
ac.ready = ready
}
ac.mu.Unlock()
select {
case <-ctx.Done():
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
}
}
}

View File

@ -146,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
return nil, Errorf(codes.Unavailable, "%v", err)
}
continue
}

View File

@ -405,6 +405,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv.be = be
srv.lessor = lease.NewLessor(srv.be)
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be)
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
@ -413,6 +417,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
srv.authStore = auth.NewAuthStore(srv.be)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
@ -658,6 +663,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
newbe := backend.NewDefaultBackend(fn)
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}
plog.Info("restoring mvcc store...")
if err := s.kv.Restore(newbe); err != nil {
@ -684,12 +697,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.be = newbe
s.bemu.Unlock()
if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}
plog.Info("recovering alarms...")
if err := s.restoreAlarms(); err != nil {
plog.Panicf("restore alarms error: %v", err)

View File

@ -45,13 +45,18 @@ var (
type LeaseID int64
// RangeDeleter defines an interface with DeleteRange method.
// RangeDeleter defines an interface with Txn and DeleteRange method.
// We define this interface only for lessor to limit the number
// of methods of mvcc.KV to what lessor actually needs.
//
// Having a minimum interface makes testing easy.
type RangeDeleter interface {
DeleteRange(key, end []byte) (int64, int64)
// TxnBegin see comments on mvcc.KV
TxnBegin() int64
// TxnEnd see comments on mvcc.KV
TxnEnd(txnID int64) error
// TxnDeleteRange see comments on mvcc.KV
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
}
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
@ -211,16 +216,30 @@ func (le *lessor) Revoke(id LeaseID) error {
// unlock before doing external work
le.mu.Unlock()
if le.rd != nil {
for item := range l.itemSet {
le.rd.DeleteRange([]byte(item.Key), nil)
if le.rd == nil {
return nil
}
tid := le.rd.TxnBegin()
for item := range l.itemSet {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
if err != nil {
panic(err)
}
}
le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
err := le.rd.TxnEnd(tid)
if err != nil {
panic(err)
}
return nil
}
@ -443,16 +462,7 @@ func (l Lease) persistTo(b backend.Backend) {
b.BatchTx().Unlock()
}
func (l Lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)
b.BatchTx().Unlock()
}
// refresh refreshes the expiry of the lease. It extends the expiry at least
// minLeaseTTL second.
// refresh refreshes the expiry of the lease.
func (l *Lease) refresh(extend time.Duration) {
if l.TTL < minLeaseTTL {
l.TTL = minLeaseTTL

View File

@ -223,9 +223,17 @@ type fakeDeleter struct {
deleted []string
}
func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
func (fd *fakeDeleter) TxnBegin() int64 {
return 0
}
func (fd *fakeDeleter) TxnEnd(txnID int64) error {
return nil
}
func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
return 0, 0
return 0, 0, nil
}
func NewTestBackend(t *testing.T) (string, backend.Backend) {

View File

@ -367,6 +367,8 @@ func (s *store) restore() error {
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
keyToLease := make(map[string]lease.LeaseID)
// restore index
tx := s.b.BatchTx()
tx.Lock()
@ -390,26 +392,15 @@ func (s *store) restore() error {
switch {
case isTombstone(key):
s.kvindex.Tombstone(kv.Key, rev)
if lease.LeaseID(kv.Lease) != lease.NoLease {
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil && err != lease.ErrLeaseNotFound {
plog.Fatalf("unexpected Detach error %v", err)
}
}
delete(keyToLease, string(kv.Key))
default:
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
if lease.LeaseID(kv.Lease) != lease.NoLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
// We are walking through the kv history here. It is possible that we attached a key to
// the lease and the lease was revoked later.
// Thus attaching an old version of key to a none existing lease is possible here, and
// we should just ignore the error.
if err != nil && err != lease.ErrLeaseNotFound {
panic("unexpected Attach error")
}
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
}
}
@ -417,6 +408,16 @@ func (s *store) restore() error {
s.currentRev = rev
}
for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
plog.Errorf("unexpected Attach error: %v", err)
}
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease detach")
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil {
plog.Fatalf("cannot detach %v", err)
plog.Errorf("cannot detach %v", err)
}
}
}

View File

@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
default:
plog.Panicf("unhandled stream type %s", t)
}
cr.closer = rc
select {
case <-cr.stopc:
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
}
return io.EOF
default:
cr.closer = rc
}
cr.mu.Unlock()
for {

View File

@ -17,6 +17,7 @@ package rafthttp
import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
@ -180,6 +181,60 @@ func TestStreamReaderDialResult(t *testing.T) {
}
}
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
func TestStreamReaderStopOnDial(t *testing.T) {
defer testutil.AfterTest(t)
h := http.Header{}
h.Add("X-Server-Version", version.Version)
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
}
tr.onResp = func() {
// stop() waits for the run() goroutine to exit, but that exit
// needs a response from RoundTrip() first; use goroutine
go sr.stop()
// wait so that stop() is blocked on run() exiting
time.Sleep(10 * time.Millisecond)
// sr.run() completes dialing then begins decoding while stopped
}
sr.start()
select {
case <-sr.done:
case <-time.After(time.Second):
t.Fatal("streamReader did not stop in time")
}
}
type respWaitRoundTripper struct {
rrt *respRoundTripper
onResp func()
}
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := t.rrt.RoundTrip(req)
resp.Body = newWaitReadCloser()
t.onResp()
return resp, err
}
type waitReadCloser struct{ closec chan struct{} }
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
<-wrc.closec
return 0, io.EOF
}
func (wrc *waitReadCloser) Close() error {
close(wrc.closec)
return nil
}
// TestStreamReaderDialDetectUnsupport tests that dial func could find
// out that the stream type is not supported by the remote.
func TestStreamReaderDialDetectUnsupport(t *testing.T) {

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.5"
Version = "3.0.6"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"