Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
9efa00d103 | |||
72d30f4c34 | |||
2e92779777 | |||
404415b1e3 | |||
07e421d245 | |||
a7d6e29275 | |||
1a8b295dab | |||
ffc45cc066 | |||
0db1ba8093 |
36
cmd/Godeps/Godeps.json
generated
36
cmd/Godeps/Godeps.json
generated
@ -237,48 +237,48 @@
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/codes",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/credentials",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/grpclog",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/internal",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/metadata",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/naming",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/peer",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "google.golang.org/grpc/transport",
|
||||
"Comment": "v1.0.0-174-gc278196",
|
||||
"Rev": "c2781963b3af261a37e0f14fdcb7c1fa13259e1f"
|
||||
"Comment": "v1.0.0-183-g231b4cf",
|
||||
"Rev": "231b4cfea0e79843053a33f5fe90bd4d84b23cd3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "gopkg.in/cheggaaa/pb.v1",
|
||||
|
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/call.go
generated
vendored
@ -170,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
if _, ok := err.(*rpcError); ok {
|
||||
return err
|
||||
}
|
||||
if err == errConnClosing {
|
||||
if err == errConnClosing || err == errConnUnavailable {
|
||||
if c.failFast {
|
||||
return Errorf(codes.Unavailable, "%v", errConnClosing)
|
||||
return Errorf(codes.Unavailable, "%v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
67
cmd/vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -73,7 +73,9 @@ var (
|
||||
errConnDrain = errors.New("grpc: the connection is drained")
|
||||
// errConnClosing indicates that the connection is closing.
|
||||
errConnClosing = errors.New("grpc: the connection is closing")
|
||||
errNoAddr = errors.New("grpc: there is no address available to dial")
|
||||
// errConnUnavailable indicates that the connection is unavailable.
|
||||
errConnUnavailable = errors.New("grpc: the connection is unavailable")
|
||||
errNoAddr = errors.New("grpc: there is no address available to dial")
|
||||
// minimum time to give a connection to complete
|
||||
minConnectTimeout = 20 * time.Second
|
||||
)
|
||||
@ -213,9 +215,14 @@ func WithUserAgent(s string) DialOption {
|
||||
}
|
||||
}
|
||||
|
||||
// Dial creates a client connection the given target.
|
||||
// Dial creates a client connection to the given target.
|
||||
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
||||
ctx := context.Background()
|
||||
return DialContext(context.Background(), target, opts...)
|
||||
}
|
||||
|
||||
// DialContext creates a client connection to the given target
|
||||
// using the supplied context.
|
||||
func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
|
||||
cc := &ClientConn{
|
||||
target: target,
|
||||
conns: make(map[Address]*addrConn),
|
||||
@ -472,6 +479,10 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
||||
if cc.dopts.balancer == nil {
|
||||
// If balancer is nil, there should be only one addrConn available.
|
||||
cc.mu.RLock()
|
||||
if cc.conns == nil {
|
||||
cc.mu.RUnlock()
|
||||
return nil, nil, toRPCErr(ErrClientConnClosing)
|
||||
}
|
||||
for _, ac = range cc.conns {
|
||||
// Break after the first iteration to get the first addrConn.
|
||||
ok = true
|
||||
@ -501,11 +512,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
||||
}
|
||||
return nil, nil, errConnClosing
|
||||
}
|
||||
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
||||
// - If RPC is failfast, ac.wait should not block.
|
||||
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
||||
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
||||
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
||||
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
|
||||
if err != nil {
|
||||
if put != nil {
|
||||
put()
|
||||
@ -757,36 +764,42 @@ func (ac *addrConn) transportMonitor() {
|
||||
}
|
||||
|
||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||
// iv) transport is in TransientFailure and blocking is false.
|
||||
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
||||
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
|
||||
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
|
||||
for {
|
||||
ac.mu.Lock()
|
||||
switch {
|
||||
case ac.state == Shutdown:
|
||||
err := ac.tearDownErr
|
||||
if failfast || !hasBalancer {
|
||||
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
|
||||
err := ac.tearDownErr
|
||||
ac.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return nil, err
|
||||
return nil, errConnClosing
|
||||
case ac.state == Ready:
|
||||
ct := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return ct, nil
|
||||
case ac.state == TransientFailure && !blocking:
|
||||
ac.mu.Unlock()
|
||||
return nil, errConnClosing
|
||||
default:
|
||||
ready := ac.ready
|
||||
if ready == nil {
|
||||
ready = make(chan struct{})
|
||||
ac.ready = ready
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, toRPCErr(ctx.Err())
|
||||
// Wait until the new transport is ready or failed.
|
||||
case <-ready:
|
||||
case ac.state == TransientFailure:
|
||||
if failfast || hasBalancer {
|
||||
ac.mu.Unlock()
|
||||
return nil, errConnUnavailable
|
||||
}
|
||||
}
|
||||
ready := ac.ready
|
||||
if ready == nil {
|
||||
ready = make(chan struct{})
|
||||
ac.ready = ready
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, toRPCErr(ctx.Err())
|
||||
// Wait until the new transport is ready or failed.
|
||||
case <-ready:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
4
cmd/vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -146,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
if _, ok := err.(*rpcError); ok {
|
||||
return nil, err
|
||||
}
|
||||
if err == errConnClosing {
|
||||
if err == errConnClosing || err == errConnUnavailable {
|
||||
if c.failFast {
|
||||
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
|
||||
return nil, Errorf(codes.Unavailable, "%v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -405,6 +405,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
srv.be = be
|
||||
srv.lessor = lease.NewLessor(srv.be)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
srv.lessor = lease.NewLessor(srv.be)
|
||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||
if beExist {
|
||||
kvindex := srv.kv.ConsistentIndex()
|
||||
@ -413,6 +417,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||
|
||||
srv.authStore = auth.NewAuthStore(srv.be)
|
||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||
@ -658,6 +663,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
plog.Info("finished recovering lessor")
|
||||
}
|
||||
|
||||
plog.Info("restoring mvcc store...")
|
||||
|
||||
if err := s.kv.Restore(newbe); err != nil {
|
||||
@ -684,12 +697,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
s.be = newbe
|
||||
s.bemu.Unlock()
|
||||
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
plog.Info("finished recovering lessor")
|
||||
}
|
||||
|
||||
plog.Info("recovering alarms...")
|
||||
if err := s.restoreAlarms(); err != nil {
|
||||
plog.Panicf("restore alarms error: %v", err)
|
||||
|
@ -45,13 +45,18 @@ var (
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
// RangeDeleter defines an interface with DeleteRange method.
|
||||
// RangeDeleter defines an interface with Txn and DeleteRange method.
|
||||
// We define this interface only for lessor to limit the number
|
||||
// of methods of mvcc.KV to what lessor actually needs.
|
||||
//
|
||||
// Having a minimum interface makes testing easy.
|
||||
type RangeDeleter interface {
|
||||
DeleteRange(key, end []byte) (int64, int64)
|
||||
// TxnBegin see comments on mvcc.KV
|
||||
TxnBegin() int64
|
||||
// TxnEnd see comments on mvcc.KV
|
||||
TxnEnd(txnID int64) error
|
||||
// TxnDeleteRange see comments on mvcc.KV
|
||||
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||
}
|
||||
|
||||
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
||||
@ -211,16 +216,30 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
// unlock before doing external work
|
||||
le.mu.Unlock()
|
||||
|
||||
if le.rd != nil {
|
||||
for item := range l.itemSet {
|
||||
le.rd.DeleteRange([]byte(item.Key), nil)
|
||||
if le.rd == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
l.removeFrom(le.b)
|
||||
// lease deletion needs to be in the same backend transaction with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -443,16 +462,7 @@ func (l Lease) persistTo(b backend.Backend) {
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
func (l Lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.ID))
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// refresh refreshes the expiry of the lease. It extends the expiry at least
|
||||
// minLeaseTTL second.
|
||||
// refresh refreshes the expiry of the lease.
|
||||
func (l *Lease) refresh(extend time.Duration) {
|
||||
if l.TTL < minLeaseTTL {
|
||||
l.TTL = minLeaseTTL
|
||||
|
@ -223,9 +223,17 @@ type fakeDeleter struct {
|
||||
deleted []string
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
|
||||
func (fd *fakeDeleter) TxnBegin() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) TxnEnd(txnID int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
|
||||
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
||||
return 0, 0
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||
|
@ -367,6 +367,8 @@ func (s *store) restore() error {
|
||||
revToBytes(revision{main: 1}, min)
|
||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||
|
||||
keyToLease := make(map[string]lease.LeaseID)
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
@ -390,26 +392,15 @@ func (s *store) restore() error {
|
||||
switch {
|
||||
case isTombstone(key):
|
||||
s.kvindex.Tombstone(kv.Key, rev)
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil && err != lease.ErrLeaseNotFound {
|
||||
plog.Fatalf("unexpected Detach error %v", err)
|
||||
}
|
||||
}
|
||||
delete(keyToLease, string(kv.Key))
|
||||
|
||||
default:
|
||||
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
// We are walking through the kv history here. It is possible that we attached a key to
|
||||
// the lease and the lease was revoked later.
|
||||
// Thus attaching an old version of key to a none existing lease is possible here, and
|
||||
// we should just ignore the error.
|
||||
if err != nil && err != lease.ErrLeaseNotFound {
|
||||
panic("unexpected Attach error")
|
||||
}
|
||||
|
||||
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
|
||||
keyToLease[string(kv.Key)] = lid
|
||||
} else {
|
||||
delete(keyToLease, string(kv.Key))
|
||||
}
|
||||
}
|
||||
|
||||
@ -417,6 +408,16 @@ func (s *store) restore() error {
|
||||
s.currentRev = rev
|
||||
}
|
||||
|
||||
for key, lid := range keyToLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|
||||
if err != nil {
|
||||
plog.Errorf("unexpected Attach error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
|
||||
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
panic("unexpected error from lease detach")
|
||||
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot detach %v", err)
|
||||
plog.Errorf("cannot detach %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -332,7 +332,16 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
||||
default:
|
||||
plog.Panicf("unhandled stream type %s", t)
|
||||
}
|
||||
cr.closer = rc
|
||||
select {
|
||||
case <-cr.stopc:
|
||||
cr.mu.Unlock()
|
||||
if err := rc.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return io.EOF
|
||||
default:
|
||||
cr.closer = rc
|
||||
}
|
||||
cr.mu.Unlock()
|
||||
|
||||
for {
|
||||
|
@ -17,6 +17,7 @@ package rafthttp
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
@ -180,6 +181,60 @@ func TestStreamReaderDialResult(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
|
||||
func TestStreamReaderStopOnDial(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
h := http.Header{}
|
||||
h.Add("X-Server-Version", version.Version)
|
||||
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
|
||||
sr := &streamReader{
|
||||
peerID: types.ID(2),
|
||||
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
||||
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
||||
errorc: make(chan error, 1),
|
||||
typ: streamTypeMessage,
|
||||
status: newPeerStatus(types.ID(2)),
|
||||
}
|
||||
tr.onResp = func() {
|
||||
// stop() waits for the run() goroutine to exit, but that exit
|
||||
// needs a response from RoundTrip() first; use goroutine
|
||||
go sr.stop()
|
||||
// wait so that stop() is blocked on run() exiting
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// sr.run() completes dialing then begins decoding while stopped
|
||||
}
|
||||
sr.start()
|
||||
select {
|
||||
case <-sr.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("streamReader did not stop in time")
|
||||
}
|
||||
}
|
||||
|
||||
type respWaitRoundTripper struct {
|
||||
rrt *respRoundTripper
|
||||
onResp func()
|
||||
}
|
||||
|
||||
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
resp, err := t.rrt.RoundTrip(req)
|
||||
resp.Body = newWaitReadCloser()
|
||||
t.onResp()
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type waitReadCloser struct{ closec chan struct{} }
|
||||
|
||||
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
|
||||
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
|
||||
<-wrc.closec
|
||||
return 0, io.EOF
|
||||
}
|
||||
func (wrc *waitReadCloser) Close() error {
|
||||
close(wrc.closec)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
||||
// out that the stream type is not supported by the remote.
|
||||
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.3.0"
|
||||
Version = "3.0.5"
|
||||
Version = "3.0.6"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
Reference in New Issue
Block a user