diff --git a/clientv3/client.go b/clientv3/client.go index 71c64ef0f..5ef96f259 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -235,9 +235,9 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) { return nil, err } -// isHalted returns true if the given error and context indicate no forward +// isHaltErr returns true if the given error and context indicate no forward // progress can be made, even after reconnecting. -func isHalted(ctx context.Context, err error) bool { +func isHaltErr(ctx context.Context, err error) bool { isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ") return isRPCError || ctx.Err() != nil } diff --git a/clientv3/client_test.go b/clientv3/client_test.go index 0787a874b..f128d7413 100644 --- a/clientv3/client_test.go +++ b/clientv3/client_test.go @@ -55,16 +55,16 @@ func TestDialTimeout(t *testing.T) { } } -func TestIsHalted(t *testing.T) { - if !isHalted(nil, fmt.Errorf("etcdserver: some etcdserver error")) { +func TestIsHaltErr(t *testing.T) { + if !isHaltErr(nil, fmt.Errorf("etcdserver: some etcdserver error")) { t.Errorf(`error prefixed with "etcdserver: " should be Halted`) } ctx, cancel := context.WithCancel(context.TODO()) - if isHalted(ctx, nil) { + if isHaltErr(ctx, nil) { t.Errorf("no error and active context should not be Halted") } cancel() - if !isHalted(ctx, nil) { + if !isHaltErr(ctx, nil) { t.Errorf("cancel on context should be Halted") } } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 3acc63e5f..6ffee1a98 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -17,6 +17,7 @@ package clientv3 import ( "sync" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" @@ -70,12 +71,12 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd return (*MemberAddResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } go c.switchRemote(err) - return nil, err + return nil, rpctypes.Error(err) } func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { @@ -85,12 +86,12 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes return (*MemberRemoveResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } go c.switchRemote(err) - return nil, err + return nil, rpctypes.Error(err) } func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { @@ -102,13 +103,13 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin return (*MemberUpdateResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } err = c.switchRemote(err) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } } } @@ -121,13 +122,13 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { return (*MemberListResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } err = c.switchRemote(err) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } } } diff --git a/clientv3/kv.go b/clientv3/kv.go index 04e33688b..26197a8ed 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -17,6 +17,7 @@ package clientv3 import ( "sync" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" @@ -96,17 +97,17 @@ func NewKV(c *Client) KV { func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { r, err := kv.Do(ctx, OpPut(key, val, opts...)) - return r.put, err + return r.put, rpctypes.Error(err) } func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { r, err := kv.Do(ctx, OpGet(key, opts...)) - return r.get, err + return r.get, rpctypes.Error(err) } func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { r, err := kv.Do(ctx, OpDelete(key, opts...)) - return r.del, err + return r.del, rpctypes.Error(err) } func (kv *kv) Compact(ctx context.Context, rev int64) error { @@ -116,12 +117,12 @@ func (kv *kv) Compact(ctx context.Context, rev int64) error { return nil } - if isHalted(ctx, err) { - return err + if isHaltErr(ctx, err) { + return rpctypes.Error(err) } go kv.switchRemote(err) - return err + return rpctypes.Error(err) } func (kv *kv) Txn(ctx context.Context) Txn { @@ -166,14 +167,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { panic("Unknown op") } - if isHalted(ctx, err) { - return OpResponse{}, err + if isHaltErr(ctx, err) { + return OpResponse{}, rpctypes.Error(err) } // do not retry on modifications if op.isWrite() { go kv.switchRemote(err) - return OpResponse{}, err + return OpResponse{}, rpctypes.Error(err) } if nerr := kv.switchRemote(err); nerr != nil { @@ -192,7 +193,7 @@ func (kv *kv) switchRemote(prevErr error) error { newConn, err := kv.c.retryConnection(kv.conn, prevErr) if err != nil { - return err + return rpctypes.Error(err) } kv.conn = newConn diff --git a/clientv3/lease.go b/clientv3/lease.go index e27e2b04a..c5776ea92 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -134,9 +134,10 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err } return gresp, nil } - if isHalted(cctx, err) { - return nil, err + if isHaltErr(cctx, err) { + return nil, rpctypes.Error(err) } + if nerr := l.switchRemoteAndStream(err); nerr != nil { return nil, nerr } @@ -155,8 +156,8 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, if err == nil { return (*LeaseRevokeResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } if nerr := l.switchRemoteAndStream(err); nerr != nil { @@ -204,8 +205,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive } return resp, err } - if isHalted(ctx, err) { - return resp, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } nerr := l.switchRemoteAndStream(err) @@ -259,17 +260,17 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive stream, err := l.getRemote().LeaseKeepAlive(cctx) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } resp, rerr := stream.Recv() if rerr != nil { - return nil, rerr + return nil, rpctypes.Error(rerr) } karesp := &LeaseKeepAliveResponse{ @@ -296,7 +297,7 @@ func (l *lessor) recvKeepAliveLoop() { for serr == nil { resp, err := stream.Recv() if err != nil { - if isHalted(l.stopCtx, err) { + if isHaltErr(l.stopCtx, err) { return } stream, serr = l.resetRecv() @@ -411,7 +412,7 @@ func (l *lessor) switchRemoteAndStream(prevErr error) error { conn.Close() newConn, err = l.c.retryConnection(conn, prevErr) if err != nil { - return err + return rpctypes.Error(err) } } @@ -436,7 +437,7 @@ func (l *lessor) newStream() error { stream, err := l.getRemote().LeaseKeepAlive(sctx) if err != nil { cancel() - return err + return rpctypes.Error(err) } l.mu.Lock() diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 063426cdc..6c3e372dd 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -18,6 +18,7 @@ import ( "io" "sync" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" @@ -81,8 +82,8 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { if err == nil { return (*AlarmResponse)(resp), nil } - if isHalted(ctx, err) { - return nil, err + if isHaltErr(ctx, err) { + return nil, rpctypes.Error(err) } if err = m.switchRemote(err); err != nil { return nil, err @@ -100,13 +101,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE { ar, err := m.AlarmList(ctx) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } ret := AlarmResponse{} for _, am := range ar.Alarms { dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am)) if derr != nil { - return nil, derr + return nil, rpctypes.Error(derr) } ret.Alarms = append(ret.Alarms, dresp.Alarms...) } @@ -117,21 +118,21 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if err == nil { return (*AlarmResponse)(resp), nil } - if !isHalted(ctx, err) { + if isHaltErr(ctx, err) { go m.switchRemote(err) } - return nil, err + return nil, rpctypes.Error(err) } func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) { conn, err := m.c.Dial(endpoint) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } remote := pb.NewMaintenanceClient(conn) resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } return (*DefragmentResponse)(resp), nil } @@ -139,12 +140,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) { conn, err := m.c.Dial(endpoint) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } remote := pb.NewMaintenanceClient(conn) resp, err := remote.Status(ctx, &pb.StatusRequest{}) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } return (*StatusResponse)(resp), nil } @@ -152,7 +153,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{}) if err != nil { - return nil, err + return nil, rpctypes.Error(err) } pr, pw := io.Pipe() @@ -187,7 +188,7 @@ func (m *maintenance) switchRemote(prevErr error) error { defer m.mu.Unlock() newConn, err := m.c.retryConnection(m.conn, prevErr) if err != nil { - return err + return rpctypes.Error(err) } m.conn = newConn m.remote = pb.NewMaintenanceClient(m.conn) diff --git a/clientv3/txn.go b/clientv3/txn.go index 14607f21a..84ec4464e 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -17,6 +17,7 @@ package clientv3 import ( "sync" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" ) @@ -146,13 +147,13 @@ func (txn *txn) Commit() (*TxnResponse, error) { return (*TxnResponse)(resp), nil } - if isHalted(txn.ctx, err) { - return nil, err + if isHaltErr(txn.ctx, err) { + return nil, rpctypes.Error(err) } if txn.isWrite { go kv.switchRemote(err) - return nil, err + return nil, rpctypes.Error(err) } if nerr := kv.switchRemote(err); nerr != nil { diff --git a/clientv3/watch.go b/clientv3/watch.go index b41168d16..34636f153 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -209,7 +209,7 @@ func (w *watcher) Close() error { case <-w.donec: } <-w.donec - return <-w.errc + return v3rpc.Error(<-w.errc) } func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { @@ -496,7 +496,7 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { break } } - return ws, err + return ws, v3rpc.Error(err) } // openWatchClient retries opening a watchclient until retryConnection fails @@ -504,8 +504,8 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { if ws, err = w.remote.Watch(w.ctx); ws != nil { break - } else if isHalted(w.ctx, err) { - return nil, err + } else if isHaltErr(w.ctx, err) { + return nil, v3rpc.Error(err) } newConn, nerr := w.c.retryConnection(w.conn, nil) if nerr != nil {