clientv3: use grpc reconnection logic

This commit is contained in:
Anthony Romano
2016-06-07 17:03:48 -07:00
parent 1823702cc6
commit 62f8ec25c0
15 changed files with 171 additions and 594 deletions

View File

@ -21,7 +21,6 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type (
@ -76,7 +75,6 @@ type lessor struct {
// donec is closed when recvKeepAliveLoop stops
donec chan struct{}
rc *remoteClient
remote pb.LeaseClient
stream pb.Lease_LeaseKeepAliveClient
@ -102,14 +100,10 @@ func NewLease(c *Client) Lease {
l := &lessor{
donec: make(chan struct{}),
keepAlives: make(map[LeaseID]*keepAlive),
remote: pb.NewLeaseClient(c.conn),
}
f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) }
l.rc = newRemoteClient(c, f)
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
go l.recvKeepAliveLoop()
return l
}
@ -120,7 +114,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
for {
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.getRemote().LeaseGrant(cctx, r)
resp, err := l.remote.LeaseGrant(cctx, r)
if err == nil {
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
@ -131,10 +125,9 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
return gresp, nil
}
if isHaltErr(cctx, err) {
return nil, rpctypes.Error(err)
return nil, toErr(ctx, err)
}
if nerr := l.switchRemoteAndStream(err); nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@ -147,16 +140,15 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
for {
r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.getRemote().LeaseRevoke(cctx, r)
resp, err := l.remote.LeaseRevoke(cctx, r)
if err == nil {
return (*LeaseRevokeResponse)(resp), nil
}
if isHaltErr(ctx, err) {
return nil, rpctypes.Error(err)
return nil, toErr(ctx, err)
}
if nerr := l.switchRemoteAndStream(err); nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@ -202,11 +194,10 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return resp, err
}
if isHaltErr(ctx, err) {
return nil, rpctypes.Error(err)
return nil, toErr(ctx, err)
}
nerr := l.switchRemoteAndStream(err)
if nerr != nil {
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
@ -254,19 +245,19 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
cctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := l.getRemote().LeaseKeepAlive(cctx)
stream, err := l.remote.LeaseKeepAlive(cctx)
if err != nil {
return nil, rpctypes.Error(err)
return nil, toErr(ctx, err)
}
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil {
return nil, rpctypes.Error(err)
return nil, toErr(ctx, err)
}
resp, rerr := stream.Recv()
if rerr != nil {
return nil, rpctypes.Error(rerr)
return nil, toErr(ctx, rerr)
}
karesp := &LeaseKeepAliveResponse{
@ -304,7 +295,7 @@ func (l *lessor) recvKeepAliveLoop() {
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
if err := l.switchRemoteAndStream(nil); err != nil {
if err := l.newStream(); err != nil {
return nil, err
}
stream := l.getKeepAliveStream()
@ -380,38 +371,18 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
}
}
func (l *lessor) getRemote() pb.LeaseClient {
l.rc.mu.Lock()
defer l.rc.mu.Unlock()
return l.remote
}
func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
l.mu.Lock()
defer l.mu.Unlock()
return l.stream
}
func (l *lessor) switchRemoteAndStream(prevErr error) error {
for {
if prevErr != nil {
err := l.rc.reconnectWait(l.stopCtx, prevErr)
if err != nil {
return rpctypes.Error(err)
}
}
if prevErr = l.newStream(); prevErr == nil {
return nil
}
}
}
func (l *lessor) newStream() error {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.getRemote().LeaseKeepAlive(sctx)
stream, err := l.remote.LeaseKeepAlive(sctx)
if err != nil {
cancel()
return rpctypes.Error(err)
return toErr(sctx, err)
}
l.mu.Lock()