clientv3: call KV/Txn APIs with default gRPC call options

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee
2017-12-19 15:06:57 -08:00
parent e82f0557ac
commit c67e6d5f5e
5 changed files with 29 additions and 15 deletions

View File

@ -183,7 +183,7 @@ func TestDialForeignEndpoint(t *testing.T) {
// grpc can return a lazy connection that's not connected yet; confirm // grpc can return a lazy connection that's not connected yet; confirm
// that it can communicate with the cluster. // that it can communicate with the cluster.
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn)) kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0))
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel() defer cancel()
if _, gerr := kvc.Get(ctx, "abc"); gerr != nil { if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {

View File

@ -18,6 +18,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type ( type (
@ -89,14 +90,23 @@ func (resp *TxnResponse) OpResponse() OpResponse {
type kv struct { type kv struct {
remote pb.KVClient remote pb.KVClient
callOpts []grpc.CallOption
} }
func NewKV(c *Client) KV { func NewKV(c *Client) KV {
return &kv{remote: RetryKVClient(c)} api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
} }
func NewKVFromKVClient(remote pb.KVClient) KV { func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
return &kv{remote: remote} api := &kv{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
} }
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
@ -115,7 +125,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
} }
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
@ -126,6 +136,7 @@ func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{ return &txn{
kv: kv, kv: kv,
ctx: ctx, ctx: ctx,
callOpts: kv.callOpts,
} }
} }
@ -134,27 +145,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
switch op.t { switch op.t {
case tRange: case tRange:
var resp *pb.RangeResponse var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest()) resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil { if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil return OpResponse{get: (*GetResponse)(resp)}, nil
} }
case tPut: case tPut:
var resp *pb.PutResponse var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r) resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil { if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil return OpResponse{put: (*PutResponse)(resp)}, nil
} }
case tDeleteRange: case tDeleteRange:
var resp *pb.DeleteRangeResponse var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r) resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil { if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil return OpResponse{del: (*DeleteResponse)(resp)}, nil
} }
case tTxn: case tTxn:
var resp *pb.TxnResponse var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil { if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil return OpResponse{txn: (*TxnResponse)(resp)}, nil
} }

View File

@ -20,6 +20,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
// Txn is the interface that wraps mini-transactions. // Txn is the interface that wraps mini-transactions.
@ -66,6 +67,8 @@ type txn struct {
sus []*pb.RequestOp sus []*pb.RequestOp
fas []*pb.RequestOp fas []*pb.RequestOp
callOpts []grpc.CallOption
} }
func (txn *txn) If(cs ...Cmp) Txn { func (txn *txn) If(cs ...Cmp) Txn {
@ -140,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
var resp *pb.TxnResponse var resp *pb.TxnResponse
var err error var err error
resp, err = txn.kv.remote.Txn(txn.ctx, r) resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil { if err != nil {
return nil, toErr(txn.ctx, err) return nil, toErr(txn.ctx, err)
} }

View File

@ -32,7 +32,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
c := clientv3.NewCtxClient(context.Background()) c := clientv3.NewCtxClient(context.Background())
kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s)) kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
c.KV = clientv3.NewKVFromKVClient(kvc) c.KV = clientv3.NewKVFromKVClient(kvc, c)
lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s)) lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second) c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)

View File

@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
return nil, err return nil, err
} }
rpc := toGRPC(c) rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV) c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
pmu.Lock() pmu.Lock()
lc := c.Lease lc := c.Lease
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)