clientv3: use default client kv

This commit is contained in:
Anthony Romano
2016-02-24 22:49:24 -08:00
parent d430c7baf7
commit 3e57bbf317
21 changed files with 65 additions and 103 deletions

View File

@ -24,7 +24,6 @@ import (
// Mutex implements the sync Locker interface with etcd // Mutex implements the sync Locker interface with etcd
type Mutex struct { type Mutex struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
pfx string pfx string
@ -33,7 +32,7 @@ type Mutex struct {
} }
func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex { func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex {
return &Mutex{client, v3.NewKV(client), ctx, pfx, "", -1} return &Mutex{client, ctx, pfx, "", -1}
} }
// Lock locks the mutex with a cancellable context. If the context is cancelled // Lock locks the mutex with a cancellable context. If the context is cancelled
@ -44,12 +43,12 @@ func (m *Mutex) Lock(ctx context.Context) error {
return err return err
} }
// put self in lock waiters via myKey; oldest waiter holds lock // put self in lock waiters via myKey; oldest waiter holds lock
m.myKey, m.myRev, err = NewUniqueKey(ctx, m.kv, m.pfx, v3.WithLease(s.Lease())) m.myKey, m.myRev, err = NewUniqueKey(ctx, m.client, m.pfx, v3.WithLease(s.Lease()))
// wait for lock to become available // wait for lock to become available
for err == nil { for err == nil {
// find oldest element in waiters via revision of insertion // find oldest element in waiters via revision of insertion
var resp *v3.GetResponse var resp *v3.GetResponse
resp, err = m.kv.Get(ctx, m.pfx, v3.WithFirstRev()...) resp, err = m.client.Get(ctx, m.pfx, v3.WithFirstRev()...)
if err != nil { if err != nil {
break break
} }
@ -59,7 +58,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
} }
// otherwise myKey isn't lowest, so there must be a pfx prior to myKey // otherwise myKey isn't lowest, so there must be a pfx prior to myKey
opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1)) opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1))
resp, err = m.kv.Get(ctx, m.pfx, opts...) resp, err = m.client.Get(ctx, m.pfx, opts...)
if err != nil { if err != nil {
break break
} }
@ -80,7 +79,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
} }
func (m *Mutex) Unlock() error { func (m *Mutex) Unlock() error {
if _, err := m.kv.Delete(m.ctx, m.myKey); err != nil { if _, err := m.client.Delete(m.ctx, m.myKey); err != nil {
return err return err
} }
m.myKey = "\x00" m.myKey = "\x00"

View File

@ -32,10 +32,8 @@ func ExampleKV_put() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kvc.Put(ctx, "sample_key", "sample_value") resp, err := cli.Put(ctx, "sample_key", "sample_value")
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -54,15 +52,13 @@ func ExampleKV_get() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli) _, err = cli.Put(context.TODO(), "foo", "bar")
_, err = kvc.Put(context.TODO(), "foo", "bar")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kvc.Get(ctx, "foo") resp, err := cli.Get(ctx, "foo")
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -83,11 +79,9 @@ func ExampleKV_getSortedPrefix() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
for i := range make([]int, 3) { for i := range make([]int, 3) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Put(ctx, fmt.Sprintf("key_%d", i), "value") _, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -95,7 +89,7 @@ func ExampleKV_getSortedPrefix() {
} }
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kvc.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -118,10 +112,8 @@ func ExampleKV_delete() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kvc.Delete(ctx, "key", clientv3.WithPrefix()) resp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -140,10 +132,8 @@ func ExampleKV_compact() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kvc.Get(ctx, "foo") resp, err := cli.Get(ctx, "foo")
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -151,7 +141,7 @@ func ExampleKV_compact() {
compRev := resp.Header.Revision // specify compact revision of your choice compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
err = kvc.Compact(ctx, compRev) err = cli.Compact(ctx, compRev)
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -207,15 +197,13 @@ func ExampleKV_do() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
ops := []clientv3.Op{ ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"), clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"), clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")} clientv3.OpPut("put-key", "456")}
for _, op := range ops { for _, op := range ops {
if _, err := kvc.Do(context.TODO(), op); err != nil { if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }

View File

@ -33,7 +33,6 @@ func ExampleLease_create() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
lapi := clientv3.NewLease(cli) lapi := clientv3.NewLease(cli)
defer lapi.Close() defer lapi.Close()
@ -44,7 +43,7 @@ func ExampleLease_create() {
} }
// after 5 seconds, the key 'foo' will be removed // after 5 seconds, the key 'foo' will be removed
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -60,7 +59,6 @@ func ExampleLease_revoke() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
lapi := clientv3.NewLease(cli) lapi := clientv3.NewLease(cli)
defer lapi.Close() defer lapi.Close()
@ -69,7 +67,7 @@ func ExampleLease_revoke() {
log.Fatal(err) log.Fatal(err)
} }
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -80,7 +78,7 @@ func ExampleLease_revoke() {
log.Fatal(err) log.Fatal(err)
} }
gresp, err := kvc.Get(context.TODO(), "foo") gresp, err := cli.Get(context.TODO(), "foo")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -98,7 +96,6 @@ func ExampleLease_keepAlive() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
lapi := clientv3.NewLease(cli) lapi := clientv3.NewLease(cli)
defer lapi.Close() defer lapi.Close()
@ -107,7 +104,7 @@ func ExampleLease_keepAlive() {
log.Fatal(err) log.Fatal(err)
} }
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -129,7 +126,6 @@ func ExampleLease_keepAliveOnce() {
} }
defer cli.Close() defer cli.Close()
kvc := clientv3.NewKV(cli)
lapi := clientv3.NewLease(cli) lapi := clientv3.NewLease(cli)
defer lapi.Close() defer lapi.Close()
@ -138,7 +134,7 @@ func ExampleLease_keepAliveOnce() {
log.Fatal(err) log.Fatal(err)
} }
_, err = kvc.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID))) _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(lease.LeaseID(resp.ID)))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -38,9 +38,7 @@ func Example() {
} }
defer cli.Close() // make sure to close the client defer cli.Close() // make sure to close the client
kvc := clientv3.NewKV(cli) _, err = cli.Put(context.TODO(), "foo", "bar")
_, err = kvc.Put(context.TODO(), "foo", "bar")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -48,10 +48,9 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
respchan := make(chan clientv3.GetResponse, 1024) respchan := make(chan clientv3.GetResponse, 1024)
errchan := make(chan error, 1) errchan := make(chan error, 1)
kapi := clientv3.NewKV(s.c)
// if rev is not specified, we will choose the most recent revision. // if rev is not specified, we will choose the most recent revision.
if s.rev == 0 { if s.rev == 0 {
resp, err := kapi.Get(ctx, "foo") resp, err := s.c.Get(ctx, "foo")
if err != nil { if err != nil {
errchan <- err errchan <- err
close(respchan) close(respchan)
@ -83,7 +82,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
} }
for { for {
resp, err := kapi.Get(ctx, key, opts...) resp, err := s.c.Get(ctx, key, opts...)
if err != nil { if err != nil {
errchan <- err errchan <- err
return return

View File

@ -24,32 +24,31 @@ import (
// release all blocked processes. // release all blocked processes.
type Barrier struct { type Barrier struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
key string key string
} }
func NewBarrier(client *v3.Client, key string) *Barrier { func NewBarrier(client *v3.Client, key string) *Barrier {
return &Barrier{client, v3.NewKV(client), context.TODO(), key} return &Barrier{client, context.TODO(), key}
} }
// Hold creates the barrier key causing processes to block on Wait. // Hold creates the barrier key causing processes to block on Wait.
func (b *Barrier) Hold() error { func (b *Barrier) Hold() error {
_, err := NewKey(b.kv, b.key, 0) _, err := NewKey(b.client, b.key, 0)
return err return err
} }
// Release deletes the barrier key to unblock all waiting processes. // Release deletes the barrier key to unblock all waiting processes.
func (b *Barrier) Release() error { func (b *Barrier) Release() error {
_, err := b.kv.Delete(b.ctx, b.key) _, err := b.client.Delete(b.ctx, b.key)
return err return err
} }
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait // Wait blocks on the barrier key until it is deleted. If there is no key, Wait
// assumes Release has already been called and returns immediately. // assumes Release has already been called and returns immediately.
func (b *Barrier) Wait() error { func (b *Barrier) Wait() error {
resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...) resp, err := b.client.Get(b.ctx, b.key, v3.WithFirstKey()...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,7 +24,6 @@ import (
// blocks again on Leave until all processes have left. // blocks again on Leave until all processes have left.
type DoubleBarrier struct { type DoubleBarrier struct {
client *clientv3.Client client *clientv3.Client
kv clientv3.KV
ctx context.Context ctx context.Context
key string // key for the collective barrier key string // key for the collective barrier
@ -35,7 +34,6 @@ type DoubleBarrier struct {
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier { func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
return &DoubleBarrier{ return &DoubleBarrier{
client: client, client: client,
kv: clientv3.NewKV(client),
ctx: context.TODO(), ctx: context.TODO(),
key: key, key: key,
count: count, count: count,
@ -50,7 +48,7 @@ func (b *DoubleBarrier) Enter() error {
} }
b.myKey = ek b.myKey = ek
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil { if err != nil {
return err return err
} }
@ -61,7 +59,7 @@ func (b *DoubleBarrier) Enter() error {
if len(resp.Kvs) == b.count { if len(resp.Kvs) == b.count {
// unblock waiters // unblock waiters
_, err = b.kv.Put(b.ctx, b.key+"/ready", "") _, err = b.client.Put(b.ctx, b.key+"/ready", "")
return err return err
} }
@ -75,7 +73,7 @@ func (b *DoubleBarrier) Enter() error {
// Leave waits for "count" processes to leave the barrier then returns // Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error { func (b *DoubleBarrier) Leave() error {
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if len(resp.Kvs) == 0 { if len(resp.Kvs) == 0 {
return nil return nil
} }
@ -93,7 +91,7 @@ func (b *DoubleBarrier) Leave() error {
if len(resp.Kvs) == 1 { if len(resp.Kvs) == 1 {
// this is the only node in the barrier; finish up // this is the only node in the barrier; finish up
if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil { if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err return err
} }
return b.myKey.Delete() return b.myKey.Delete()

View File

@ -22,7 +22,6 @@ import (
type Election struct { type Election struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
keyPrefix string keyPrefix string
@ -31,7 +30,7 @@ type Election struct {
// NewElection returns a new election on a given key prefix. // NewElection returns a new election on a given key prefix.
func NewElection(client *v3.Client, keyPrefix string) *Election { func NewElection(client *v3.Client, keyPrefix string) *Election {
return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil} return &Election{client, context.TODO(), keyPrefix, nil}
} }
// Volunteer puts a value as eligible for the election. It blocks until // Volunteer puts a value as eligible for the election. It blocks until
@ -62,7 +61,7 @@ func (e *Election) Resign() (err error) {
// Leader returns the leader value for the current election. // Leader returns the leader value for the current election.
func (e *Election) Leader() (string, error) { func (e *Election) Leader() (string, error) {
resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil { if err != nil {
return "", err return "", err
} else if len(resp.Kvs) == 0 { } else if len(resp.Kvs) == 0 {
@ -74,7 +73,7 @@ func (e *Election) Leader() (string, error) {
// Wait waits for a leader to be elected, returning the leader value. // Wait waits for a leader to be elected, returning the leader value.
func (e *Election) Wait() (string, error) { func (e *Election) Wait() (string, error) {
resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil { if err != nil {
return "", err return "", err
} else if len(resp.Kvs) != 0 { } else if len(resp.Kvs) != 0 {
@ -94,7 +93,7 @@ func (e *Election) Wait() (string, error) {
func (e *Election) waitLeadership(tryKey *EphemeralKV) error { func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1)) opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1))
resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) resp, err := e.client.Get(e.ctx, e.keyPrefix, opts...)
if err != nil { if err != nil {
return err return err
} else if len(resp.Kvs) == 0 { } else if len(resp.Kvs) == 0 {

View File

@ -166,7 +166,7 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
k, err := NewKV(v3.NewKV(client), key, val, s.Lease()) k, err := NewKV(client, key, val, s.Lease())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -25,20 +25,19 @@ import (
// PriorityQueue implements a multi-reader, multi-writer distributed queue. // PriorityQueue implements a multi-reader, multi-writer distributed queue.
type PriorityQueue struct { type PriorityQueue struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
key string key string
} }
// NewPriorityQueue creates an etcd priority queue. // NewPriorityQueue creates an etcd priority queue.
func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue { func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"} return &PriorityQueue{client, context.TODO(), key + "/"}
} }
// Enqueue puts a value into a queue with a given priority. // Enqueue puts a value into a queue with a given priority.
func (q *PriorityQueue) Enqueue(val string, pr uint16) error { func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
prefix := fmt.Sprintf("%s%05d", q.key, pr) prefix := fmt.Sprintf("%s%05d", q.key, pr)
_, err := NewSequentialKV(q.kv, prefix, val) _, err := NewSequentialKV(q.client, prefix, val)
return err return err
} }
@ -46,12 +45,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
// queue is empty, Dequeue blocks until items are available. // queue is empty, Dequeue blocks until items are available.
func (q *PriorityQueue) Dequeue() (string, error) { func (q *PriorityQueue) Dequeue() (string, error) {
// TODO: fewer round trips by fetching more than one key // TODO: fewer round trips by fetching more than one key
resp, err := q.kv.Get(q.ctx, q.key, v3.WithFirstKey()...) resp, err := q.client.Get(q.ctx, q.key, v3.WithFirstKey()...)
if err != nil { if err != nil {
return "", err return "", err
} }
kv, err := claimFirstKey(q.kv, resp.Kvs) kv, err := claimFirstKey(q.client, resp.Kvs)
if err != nil { if err != nil {
return "", err return "", err
} else if kv != nil { } else if kv != nil {
@ -71,7 +70,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
return "", err return "", err
} }
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil { if err != nil {
return "", err return "", err
} else if !ok { } else if !ok {

View File

@ -23,18 +23,17 @@ import (
// Queue implements a multi-reader, multi-writer distributed queue. // Queue implements a multi-reader, multi-writer distributed queue.
type Queue struct { type Queue struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
keyPrefix string keyPrefix string
} }
func NewQueue(client *v3.Client, keyPrefix string) *Queue { func NewQueue(client *v3.Client, keyPrefix string) *Queue {
return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix} return &Queue{client, context.TODO(), keyPrefix}
} }
func (q *Queue) Enqueue(val string) error { func (q *Queue) Enqueue(val string) error {
_, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0) _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0)
return err return err
} }
@ -42,12 +41,12 @@ func (q *Queue) Enqueue(val string) error {
// queue is empty, Dequeue blocks until elements are available. // queue is empty, Dequeue blocks until elements are available.
func (q *Queue) Dequeue() (string, error) { func (q *Queue) Dequeue() (string, error) {
// TODO: fewer round trips by fetching more than one key // TODO: fewer round trips by fetching more than one key
resp, err := q.kv.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
if err != nil { if err != nil {
return "", err return "", err
} }
kv, err := claimFirstKey(q.kv, resp.Kvs) kv, err := claimFirstKey(q.client, resp.Kvs)
if err != nil { if err != nil {
return "", err return "", err
} else if kv != nil { } else if kv != nil {
@ -67,7 +66,7 @@ func (q *Queue) Dequeue() (string, error) {
return "", err return "", err
} }
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision) ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
if err != nil { if err != nil {
return "", err return "", err
} else if !ok { } else if !ok {

View File

@ -22,7 +22,6 @@ import (
type RWMutex struct { type RWMutex struct {
client *v3.Client client *v3.Client
kv v3.KV
ctx context.Context ctx context.Context
key string key string
@ -30,7 +29,7 @@ type RWMutex struct {
} }
func NewRWMutex(client *v3.Client, key string) *RWMutex { func NewRWMutex(client *v3.Client, key string) *RWMutex {
return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil} return &RWMutex{client, context.TODO(), key, nil}
} }
func (rwm *RWMutex) RLock() error { func (rwm *RWMutex) RLock() error {
@ -42,7 +41,7 @@ func (rwm *RWMutex) RLock() error {
// if there are nodes with "write-" and a lower // if there are nodes with "write-" and a lower
// revision number than us we must wait // revision number than us we must wait
resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
if err != nil { if err != nil {
return err return err
} }
@ -63,7 +62,7 @@ func (rwm *RWMutex) Lock() error {
for { for {
// find any key of lower rev number blocks the write lock // find any key of lower rev number blocks the write lock
opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
if err != nil { if err != nil {
return err return err
} }
@ -83,7 +82,7 @@ func (rwm *RWMutex) Lock() error {
func (rwm *RWMutex) waitOnLowest() error { func (rwm *RWMutex) waitOnLowest() error {
// must block; get key before ek for waiting // must block; get key before ek for waiting
opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -22,7 +22,6 @@ import (
// STM implements software transactional memory over etcd // STM implements software transactional memory over etcd
type STM struct { type STM struct {
client *v3.Client client *v3.Client
kv v3.KV
// rset holds the read key's value and revision of read // rset holds the read key's value and revision of read
rset map[string]*RemoteKV rset map[string]*RemoteKV
// wset holds the write key and its value // wset holds the write key and its value
@ -34,7 +33,7 @@ type STM struct {
// NewSTM creates new transaction loop for a given apply function. // NewSTM creates new transaction loop for a given apply function.
func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error { func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error {
s := &STM{client: client, kv: v3.NewKV(client), apply: apply} s := &STM{client: client, apply: apply}
errc := make(chan error, 1) errc := make(chan error, 1)
go func() { go func() {
var err error var err error
@ -64,7 +63,7 @@ func (s *STM) Get(key string) (string, error) {
if rk, ok := s.rset[key]; ok { if rk, ok := s.rset[key]; ok {
return rk.Value(), nil return rk.Value(), nil
} }
rk, err := GetRemoteKV(s.kv, key) rk, err := GetRemoteKV(s.client, key)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -91,7 +90,7 @@ func (s *STM) commit() (ok bool, rr error) {
for k, v := range s.wset { for k, v := range s.wset {
puts = append(puts, v3.OpPut(k, v)) puts = append(puts, v3.OpPut(k, v))
} }
txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit() txnresp, err := s.client.Txn(context.TODO()).If(cmps...).Then(puts...).Commit()
return txnresp.Succeeded, err return txnresp.Succeeded, err
} }

View File

@ -20,7 +20,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
) )
// NewCompactionCommand returns the cobra command for "compaction". // NewCompactionCommand returns the cobra command for "compaction".
@ -44,7 +43,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
} }
c := mustClientFromCmd(cmd) c := mustClientFromCmd(cmd)
if cerr := clientv3.NewKV(c).Compact(context.TODO(), rev); cerr != nil { if cerr := c.Compact(context.TODO(), rev); cerr != nil {
ExitWithError(ExitError, cerr) ExitWithError(ExitError, cerr)
return return
} }

View File

@ -34,9 +34,7 @@ func NewDelCommand() *cobra.Command {
// delCommandFunc executes the "del" command. // delCommandFunc executes the "del" command.
func delCommandFunc(cmd *cobra.Command, args []string) { func delCommandFunc(cmd *cobra.Command, args []string) {
key, opts := getDelOp(cmd, args) key, opts := getDelOp(cmd, args)
c := mustClientFromCmd(cmd) resp, err := mustClientFromCmd(cmd).Delete(context.TODO(), key, opts...)
kvapi := clientv3.NewKV(c)
resp, err := kvapi.Delete(context.TODO(), key, opts...)
if err != nil { if err != nil {
ExitWithError(ExitError, err) ExitWithError(ExitError, err)
} }

View File

@ -49,9 +49,7 @@ func NewGetCommand() *cobra.Command {
// getCommandFunc executes the "get" command. // getCommandFunc executes the "get" command.
func getCommandFunc(cmd *cobra.Command, args []string) { func getCommandFunc(cmd *cobra.Command, args []string) {
key, opts := getGetOp(cmd, args) key, opts := getGetOp(cmd, args)
c := mustClientFromCmd(cmd) resp, err := mustClientFromCmd(cmd).Get(context.TODO(), key, opts...)
kvapi := clientv3.NewKV(c)
resp, err := kvapi.Get(context.TODO(), key, opts...)
if err != nil { if err != nil {
ExitWithError(ExitError, err) ExitWithError(ExitError, err)
} }

View File

@ -68,7 +68,7 @@ func lockUntilSignal(c *clientv3.Client, lockname string) error {
return err return err
} }
k, kerr := clientv3.NewKV(c).Get(ctx, m.Key()) k, kerr := c.Get(ctx, m.Key())
if kerr != nil { if kerr != nil {
return kerr return kerr
} }

View File

@ -75,15 +75,13 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
}() }()
// TODO: remove the prefix of the destination cluster? // TODO: remove the prefix of the destination cluster?
dkv := clientv3.NewKV(dc)
s := mirror.NewSyncer(c, mmprefix, 0) s := mirror.NewSyncer(c, mmprefix, 0)
rc, errc := s.SyncBase(ctx) rc, errc := s.SyncBase(ctx)
for r := range rc { for r := range rc {
for _, kv := range r.Kvs { for _, kv := range r.Kvs {
_, err := dkv.Put(ctx, string(kv.Key), string(kv.Value)) _, err := dc.Put(ctx, string(kv.Key), string(kv.Value))
if err != nil { if err != nil {
return err return err
} }
@ -109,7 +107,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
for _, ev := range wr.Events { for _, ev := range wr.Events {
nrev := ev.Kv.ModRevision nrev := ev.Kv.ModRevision
if rev != 0 && nrev > rev { if rev != 0 && nrev > rev {
_, err := dkv.Txn(ctx).Then(ops...).Commit() _, err := dc.Txn(ctx).Then(ops...).Commit()
if err != nil { if err != nil {
return err return err
} }
@ -128,7 +126,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
} }
if len(ops) != 0 { if len(ops) != 0 {
_, err := dkv.Txn(ctx).Then(ops...).Commit() _, err := dc.Txn(ctx).Then(ops...).Commit()
if err != nil { if err != nil {
return err return err
} }

View File

@ -58,9 +58,7 @@ will store the content of the file to <key>.
func putCommandFunc(cmd *cobra.Command, args []string) { func putCommandFunc(cmd *cobra.Command, args []string) {
key, value, opts := getPutOp(cmd, args) key, value, opts := getPutOp(cmd, args)
c := mustClientFromCmd(cmd) resp, err := mustClientFromCmd(cmd).Put(context.TODO(), key, value, opts...)
kvapi := clientv3.NewKV(c)
resp, err := kvapi.Put(context.TODO(), key, value, opts...)
if err != nil { if err != nil {
ExitWithError(ExitError, err) ExitWithError(ExitError, err)
} }

View File

@ -53,7 +53,7 @@ func txnCommandFunc(cmd *cobra.Command, args []string) {
reader := bufio.NewReader(os.Stdin) reader := bufio.NewReader(os.Stdin)
txn := clientv3.NewKV(mustClientFromCmd(cmd)).Txn(context.Background()) txn := mustClientFromCmd(cmd).Txn(context.Background())
fmt.Println("compares:") fmt.Println("compares:")
txn.If(readCompares(reader)...) txn.If(readCompares(reader)...)
fmt.Println("success requests (get, put, delete):") fmt.Println("success requests (get, put, delete):")

View File

@ -19,7 +19,6 @@ import (
"strconv" "strconv"
"testing" "testing"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/contrib/recipes" "github.com/coreos/etcd/contrib/recipes"
) )
@ -31,7 +30,7 @@ func TestSTMConflict(t *testing.T) {
etcdc := clus.RandClient() etcdc := clus.RandClient()
keys := make([]*recipe.RemoteKV, 5) keys := make([]*recipe.RemoteKV, 5)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0) rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
if err != nil { if err != nil {
t.Fatalf("could not make key (%v)", err) t.Fatalf("could not make key (%v)", err)
} }
@ -76,7 +75,7 @@ func TestSTMConflict(t *testing.T) {
// ensure sum matches initial sum // ensure sum matches initial sum
sum := 0 sum := 0
for _, oldRK := range keys { for _, oldRK := range keys {
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key()) rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key())
if err != nil { if err != nil {
t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err) t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
} }
@ -103,7 +102,7 @@ func TestSTMPutNewKey(t *testing.T) {
t.Fatalf("error on stm txn (%v)", err) t.Fatalf("error on stm txn (%v)", err)
} }
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") rk, err := recipe.GetRemoteKV(etcdc, "foo")
if err != nil { if err != nil {
t.Fatalf("error fetching key (%v)", err) t.Fatalf("error fetching key (%v)", err)
} }
@ -129,7 +128,7 @@ func TestSTMAbort(t *testing.T) {
t.Fatalf("error on stm txn (%v)", err) t.Fatalf("error on stm txn (%v)", err)
} }
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo") rk, err := recipe.GetRemoteKV(etcdc, "foo")
if err != nil { if err != nil {
t.Fatalf("error fetching key (%v)", err) t.Fatalf("error fetching key (%v)", err)
} }