contrib/recipes: use clientv3 kv API
This commit is contained in:
@ -16,38 +16,40 @@ package recipe
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Barrier creates a key in etcd to block processes, then deletes the key to
|
// Barrier creates a key in etcd to block processes, then deletes the key to
|
||||||
// release all blocked processes.
|
// release all blocked processes.
|
||||||
type Barrier struct {
|
type Barrier struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBarrier(client *clientv3.Client, key string) *Barrier {
|
func NewBarrier(client *v3.Client, key string) *Barrier {
|
||||||
return &Barrier{client, key}
|
return &Barrier{client, v3.NewKV(client), context.TODO(), key}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hold creates the barrier key causing processes to block on Wait.
|
// Hold creates the barrier key causing processes to block on Wait.
|
||||||
func (b *Barrier) Hold() error {
|
func (b *Barrier) Hold() error {
|
||||||
_, err := NewKey(b.client, b.key, 0)
|
_, err := NewKey(b.kv, b.key, 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release deletes the barrier key to unblock all waiting processes.
|
// Release deletes the barrier key to unblock all waiting processes.
|
||||||
func (b *Barrier) Release() error {
|
func (b *Barrier) Release() error {
|
||||||
_, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)})
|
_, err := b.kv.Delete(b.ctx, b.key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
|
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
|
||||||
// assumes Release has already been called and returns immediately.
|
// assumes Release has already been called and returns immediately.
|
||||||
func (b *Barrier) Wait() error {
|
func (b *Barrier) Wait() error {
|
||||||
resp, err := NewRange(b.client, b.key).FirstKey()
|
resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
spb "github.com/coreos/etcd/storage/storagepb"
|
spb "github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,22 +30,10 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// deleteRevKey deletes a key by revision, returning false if key is missing
|
// deleteRevKey deletes a key by revision, returning false if key is missing
|
||||||
func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
|
func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) {
|
||||||
cmp := &pb.Compare{
|
cmp := v3.Compare(v3.ModifiedRevision(key), "=", rev)
|
||||||
Result: pb.Compare_EQUAL,
|
req := v3.OpDelete(key)
|
||||||
Target: pb.Compare_MOD,
|
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
|
||||||
Key: []byte(key),
|
|
||||||
TargetUnion: &pb.Compare_ModRevision{ModRevision: rev},
|
|
||||||
}
|
|
||||||
req := &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{
|
|
||||||
RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
|
|
||||||
txnresp, err := kvc.Txn(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.TxnRequest{
|
|
||||||
Compare: []*pb.Compare{cmp},
|
|
||||||
Success: []*pb.RequestUnion{req},
|
|
||||||
Failure: nil,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
} else if txnresp.Succeeded == false {
|
} else if txnresp.Succeeded == false {
|
||||||
@ -54,27 +42,14 @@ func deleteRevKey(kvc pb.KVClient, key string, rev int64) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func claimFirstKey(kvc pb.KVClient, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
|
func claimFirstKey(kv v3.KV, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
|
||||||
for _, kv := range kvs {
|
for _, k := range kvs {
|
||||||
ok, err := deleteRevKey(kvc, string(kv.Key), kv.ModRevision)
|
ok, err := deleteRevKey(kv, string(k.Key), k.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if ok {
|
} else if ok {
|
||||||
return kv, nil
|
return k, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) {
|
|
||||||
return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}})
|
|
||||||
}
|
|
||||||
|
|
||||||
// deletePrefix performs a RangeRequest to get keys on a given prefix
|
|
||||||
func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) {
|
|
||||||
return kv.DeleteRange(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.DeleteRangeRequest{
|
|
||||||
Key: []byte(prefix),
|
|
||||||
RangeEnd: []byte(prefixEnd(prefix))})
|
|
||||||
}
|
|
||||||
|
@ -17,7 +17,6 @@ package recipe
|
|||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -25,13 +24,22 @@ import (
|
|||||||
// blocks again on Leave until all processes have left.
|
// blocks again on Leave until all processes have left.
|
||||||
type DoubleBarrier struct {
|
type DoubleBarrier struct {
|
||||||
client *clientv3.Client
|
client *clientv3.Client
|
||||||
|
kv clientv3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
key string // key for the collective barrier
|
key string // key for the collective barrier
|
||||||
count int
|
count int
|
||||||
myKey *EphemeralKV // current key for this process on the barrier
|
myKey *EphemeralKV // current key for this process on the barrier
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
|
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
|
||||||
return &DoubleBarrier{client, key, count, nil}
|
return &DoubleBarrier{
|
||||||
|
client: client,
|
||||||
|
kv: clientv3.NewKV(client),
|
||||||
|
ctx: context.TODO(),
|
||||||
|
key: key,
|
||||||
|
count: count,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enter waits for "count" processes to enter the barrier then returns
|
// Enter waits for "count" processes to enter the barrier then returns
|
||||||
@ -42,7 +50,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
}
|
}
|
||||||
b.myKey = ek
|
b.myKey = ek
|
||||||
|
|
||||||
resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
|
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -53,7 +61,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
|
|
||||||
if len(resp.Kvs) == b.count {
|
if len(resp.Kvs) == b.count {
|
||||||
// unblock waiters
|
// unblock waiters
|
||||||
_, err = putEmptyKey(b.client.KV, b.key+"/ready")
|
_, err = b.kv.Put(b.ctx, b.key+"/ready", "")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +75,7 @@ func (b *DoubleBarrier) Enter() error {
|
|||||||
|
|
||||||
// Leave waits for "count" processes to leave the barrier then returns
|
// Leave waits for "count" processes to leave the barrier then returns
|
||||||
func (b *DoubleBarrier) Leave() error {
|
func (b *DoubleBarrier) Leave() error {
|
||||||
resp, err := NewRange(b.client, b.key+"/waiters").Prefix()
|
resp, err := b.kv.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||||
if len(resp.Kvs) == 0 {
|
if len(resp.Kvs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -85,8 +93,7 @@ func (b *DoubleBarrier) Leave() error {
|
|||||||
|
|
||||||
if len(resp.Kvs) == 1 {
|
if len(resp.Kvs) == 1 {
|
||||||
// this is the only node in the barrier; finish up
|
// this is the only node in the barrier; finish up
|
||||||
req := &pb.DeleteRangeRequest{Key: []byte(b.key + "/ready")}
|
if _, err = b.kv.Delete(b.ctx, b.key+"/ready"); err != nil {
|
||||||
if _, err = b.client.KV.DeleteRange(context.TODO(), req); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return b.myKey.Delete()
|
return b.myKey.Delete()
|
||||||
|
@ -14,20 +14,24 @@
|
|||||||
package recipe
|
package recipe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Election struct {
|
type Election struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
leaderKey *EphemeralKV
|
leaderKey *EphemeralKV
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewElection returns a new election on a given key prefix.
|
// NewElection returns a new election on a given key prefix.
|
||||||
func NewElection(client *clientv3.Client, keyPrefix string) *Election {
|
func NewElection(client *v3.Client, keyPrefix string) *Election {
|
||||||
return &Election{client, keyPrefix, nil}
|
return &Election{client, v3.NewKV(client), context.TODO(), keyPrefix, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volunteer puts a value as eligible for the election. It blocks until
|
// Volunteer puts a value as eligible for the election. It blocks until
|
||||||
@ -58,7 +62,7 @@ func (e *Election) Resign() (err error) {
|
|||||||
|
|
||||||
// Leader returns the leader value for the current election.
|
// Leader returns the leader value for the current election.
|
||||||
func (e *Election) Leader() (string, error) {
|
func (e *Election) Leader() (string, error) {
|
||||||
resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
|
resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if len(resp.Kvs) == 0 {
|
} else if len(resp.Kvs) == 0 {
|
||||||
@ -70,7 +74,7 @@ func (e *Election) Leader() (string, error) {
|
|||||||
|
|
||||||
// Wait waits for a leader to be elected, returning the leader value.
|
// Wait waits for a leader to be elected, returning the leader value.
|
||||||
func (e *Election) Wait() (string, error) {
|
func (e *Election) Wait() (string, error) {
|
||||||
resp, err := NewRange(e.client, e.keyPrefix).FirstCreate()
|
resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if len(resp.Kvs) != 0 {
|
} else if len(resp.Kvs) != 0 {
|
||||||
@ -89,10 +93,8 @@ func (e *Election) Wait() (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
|
func (e *Election) waitLeadership(tryKey *EphemeralKV) error {
|
||||||
resp, err := NewRangeRev(
|
opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1))
|
||||||
e.client,
|
resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...)
|
||||||
e.keyPrefix,
|
|
||||||
tryKey.Revision()-1).LastCreate()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if len(resp.Kvs) == 0 {
|
} else if len(resp.Kvs) == 0 {
|
||||||
|
@ -20,36 +20,32 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Key is a key/revision pair created by the client and stored on etcd
|
// Key is a key/revision pair created by the client and stored on etcd
|
||||||
type RemoteKV struct {
|
type RemoteKV struct {
|
||||||
client *clientv3.Client
|
kv v3.KV
|
||||||
key string
|
key string
|
||||||
rev int64
|
rev int64
|
||||||
val string
|
val string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKey(client *clientv3.Client, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
func NewKey(kv v3.KV, key string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
||||||
return NewKV(client, key, "", leaseID)
|
return NewKV(kv, key, "", leaseID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKV(client *clientv3.Client, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
func NewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
||||||
rev, err := putNewKV(client, key, val, leaseID)
|
rev, err := putNewKV(kv, key, val, leaseID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &RemoteKV{client, key, rev, val}, nil
|
return &RemoteKV{kv, key, rev, val}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
|
func GetRemoteKV(kv v3.KV, key string) (*RemoteKV, error) {
|
||||||
resp, err := client.KV.Range(
|
resp, err := kv.Get(context.TODO(), key)
|
||||||
context.TODO(),
|
|
||||||
&pb.RangeRequest{Key: []byte(key)},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -59,23 +55,19 @@ func GetRemoteKV(client *clientv3.Client, key string) (*RemoteKV, error) {
|
|||||||
rev = resp.Kvs[0].ModRevision
|
rev = resp.Kvs[0].ModRevision
|
||||||
val = string(resp.Kvs[0].Value)
|
val = string(resp.Kvs[0].Value)
|
||||||
}
|
}
|
||||||
return &RemoteKV{
|
return &RemoteKV{kv: kv, key: key, rev: rev, val: val}, nil
|
||||||
client: client,
|
|
||||||
key: key,
|
|
||||||
rev: rev,
|
|
||||||
val: val}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUniqueKey(client *clientv3.Client, prefix string) (*RemoteKV, error) {
|
func NewUniqueKey(kv v3.KV, prefix string) (*RemoteKV, error) {
|
||||||
return NewUniqueKV(client, prefix, "", 0)
|
return NewUniqueKV(kv, prefix, "", 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
func NewUniqueKV(kv v3.KV, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
||||||
for {
|
for {
|
||||||
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
||||||
rev, err := putNewKV(client, newKey, val, 0)
|
rev, err := putNewKV(kv, newKey, val, 0)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &RemoteKV{client, newKey, rev, val}, nil
|
return &RemoteKV{kv, newKey, rev, val}, nil
|
||||||
}
|
}
|
||||||
if err != ErrKeyExists {
|
if err != ErrKeyExists {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -85,22 +77,10 @@ func NewUniqueKV(client *clientv3.Client, prefix string, val string, leaseID lea
|
|||||||
|
|
||||||
// putNewKV attempts to create the given key, only succeeding if the key did
|
// putNewKV attempts to create the given key, only succeeding if the key did
|
||||||
// not yet exist.
|
// not yet exist.
|
||||||
func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int64, error) {
|
func putNewKV(kv v3.KV, key, val string, leaseID lease.LeaseID) (int64, error) {
|
||||||
cmp := &pb.Compare{
|
cmp := v3.Compare(v3.Version(key), "=", 0)
|
||||||
Result: pb.Compare_EQUAL,
|
req := v3.OpPut(key, val, v3.WithLease(leaseID))
|
||||||
Target: pb.Compare_VERSION,
|
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
|
||||||
Key: []byte(key),
|
|
||||||
TargetUnion: &pb.Compare_Version{Version: 0}}
|
|
||||||
|
|
||||||
req := &pb.RequestUnion{
|
|
||||||
Request: &pb.RequestUnion_RequestPut{
|
|
||||||
RequestPut: &pb.PutRequest{
|
|
||||||
Key: []byte(key),
|
|
||||||
Value: []byte(val),
|
|
||||||
Lease: int64(leaseID)}}}
|
|
||||||
txnresp, err := ec.KV.Txn(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -111,14 +91,14 @@ func putNewKV(ec *clientv3.Client, key, val string, leaseID lease.LeaseID) (int6
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
|
// NewSequentialKV allocates a new sequential key-value pair at <prefix>/nnnnn
|
||||||
func NewSequentialKV(client *clientv3.Client, prefix, val string) (*RemoteKV, error) {
|
func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
|
||||||
return newSequentialKV(client, prefix, val, 0)
|
return newSequentialKV(kv, prefix, val, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
|
// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
|
||||||
// value and lease. Note: a bookkeeping node __<prefix> is also allocated.
|
// value and lease. Note: a bookkeeping node __<prefix> is also allocated.
|
||||||
func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) {
|
||||||
resp, err := NewRange(client, prefix).LastKey()
|
resp, err := kv.Get(context.TODO(), prefix, withLastKey()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -127,9 +107,9 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.
|
|||||||
newSeqNum := 0
|
newSeqNum := 0
|
||||||
if len(resp.Kvs) != 0 {
|
if len(resp.Kvs) != 0 {
|
||||||
fields := strings.Split(string(resp.Kvs[0].Key), "/")
|
fields := strings.Split(string(resp.Kvs[0].Key), "/")
|
||||||
_, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
|
_, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
|
||||||
if err != nil {
|
if serr != nil {
|
||||||
return nil, err
|
return nil, serr
|
||||||
}
|
}
|
||||||
newSeqNum++
|
newSeqNum++
|
||||||
}
|
}
|
||||||
@ -140,42 +120,22 @@ func newSequentialKV(client *clientv3.Client, prefix, val string, leaseID lease.
|
|||||||
// N1: LastKey() == 1, start txn.
|
// N1: LastKey() == 1, start txn.
|
||||||
// N2: New Key 2, New Key 3, Delete Key 2
|
// N2: New Key 2, New Key 3, Delete Key 2
|
||||||
// N1: txn succeeds allocating key 2 when it shouldn't
|
// N1: txn succeeds allocating key 2 when it shouldn't
|
||||||
baseKey := []byte("__" + prefix)
|
baseKey := "__" + prefix
|
||||||
cmp := &pb.Compare{
|
|
||||||
Result: pb.Compare_LESS,
|
|
||||||
Target: pb.Compare_MOD,
|
|
||||||
Key: []byte(baseKey),
|
|
||||||
// current revision might contain modification so +1
|
// current revision might contain modification so +1
|
||||||
TargetUnion: &pb.Compare_ModRevision{ModRevision: resp.Header.Revision + 1},
|
cmp := v3.Compare(v3.ModifiedRevision(baseKey), "<", resp.Header.Revision+1)
|
||||||
}
|
reqPrefix := v3.OpPut(baseKey, "", v3.WithLease(leaseID))
|
||||||
|
reqNewKey := v3.OpPut(newKey, val, v3.WithLease(leaseID))
|
||||||
|
|
||||||
reqPrefix := &pb.RequestUnion{
|
txn := kv.Txn(context.TODO())
|
||||||
Request: &pb.RequestUnion_RequestPut{
|
txnresp, err := txn.If(cmp).Then(reqPrefix, reqNewKey).Commit()
|
||||||
RequestPut: &pb.PutRequest{
|
|
||||||
Key: baseKey,
|
|
||||||
Lease: int64(leaseID),
|
|
||||||
}}}
|
|
||||||
|
|
||||||
reqNewKey := &pb.RequestUnion{
|
|
||||||
Request: &pb.RequestUnion_RequestPut{
|
|
||||||
RequestPut: &pb.PutRequest{
|
|
||||||
Key: []byte(newKey),
|
|
||||||
Value: []byte(val),
|
|
||||||
Lease: int64(leaseID),
|
|
||||||
}}}
|
|
||||||
|
|
||||||
txnresp, err := client.KV.Txn(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.TxnRequest{
|
|
||||||
[]*pb.Compare{cmp},
|
|
||||||
[]*pb.RequestUnion{reqPrefix, reqNewKey}, nil})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if txnresp.Succeeded == false {
|
if txnresp.Succeeded == false {
|
||||||
return newSequentialKV(client, prefix, val, leaseID)
|
return newSequentialKV(kv, prefix, val, leaseID)
|
||||||
}
|
}
|
||||||
return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil
|
return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rk *RemoteKV) Key() string { return rk.key }
|
func (rk *RemoteKV) Key() string { return rk.key }
|
||||||
@ -183,18 +143,16 @@ func (rk *RemoteKV) Revision() int64 { return rk.rev }
|
|||||||
func (rk *RemoteKV) Value() string { return rk.val }
|
func (rk *RemoteKV) Value() string { return rk.val }
|
||||||
|
|
||||||
func (rk *RemoteKV) Delete() error {
|
func (rk *RemoteKV) Delete() error {
|
||||||
if rk.client == nil {
|
if rk.kv == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := &pb.DeleteRangeRequest{Key: []byte(rk.key)}
|
_, err := rk.kv.Delete(context.TODO(), rk.key)
|
||||||
_, err := rk.client.KV.DeleteRange(context.TODO(), req)
|
rk.kv = nil
|
||||||
rk.client = nil
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rk *RemoteKV) Put(val string) error {
|
func (rk *RemoteKV) Put(val string) error {
|
||||||
req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)}
|
_, err := rk.kv.Put(context.TODO(), rk.key, val)
|
||||||
_, err := rk.client.KV.Put(context.TODO(), req)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,12 +160,12 @@ func (rk *RemoteKV) Put(val string) error {
|
|||||||
type EphemeralKV struct{ RemoteKV }
|
type EphemeralKV struct{ RemoteKV }
|
||||||
|
|
||||||
// NewEphemeralKV creates a new key/value pair associated with a session lease
|
// NewEphemeralKV creates a new key/value pair associated with a session lease
|
||||||
func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, error) {
|
func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
|
||||||
leaseID, err := SessionLease(client)
|
leaseID, err := SessionLease(client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
k, err := NewKV(client, key, val, leaseID)
|
k, err := NewKV(v3.NewKV(client), key, val, leaseID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -215,12 +173,12 @@ func NewEphemeralKV(client *clientv3.Client, key, val string) (*EphemeralKV, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
|
// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
|
||||||
func NewUniqueEphemeralKey(client *clientv3.Client, prefix string) (*EphemeralKV, error) {
|
func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) {
|
||||||
return NewUniqueEphemeralKV(client, prefix, "")
|
return NewUniqueEphemeralKV(client, prefix, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
|
// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
|
||||||
func NewUniqueEphemeralKV(client *clientv3.Client, prefix, val string) (ek *EphemeralKV, err error) {
|
func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) {
|
||||||
for {
|
for {
|
||||||
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
||||||
ek, err = NewEphemeralKV(client, newKey, val)
|
ek, err = NewEphemeralKV(client, newKey, val)
|
||||||
|
@ -17,29 +17,33 @@ package recipe
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Mutex implements the sync Locker interface with etcd
|
// Mutex implements the sync Locker interface with etcd
|
||||||
type Mutex struct {
|
type Mutex struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
key string
|
key string
|
||||||
myKey *RemoteKV
|
myKey *EphemeralKV
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMutex(client *clientv3.Client, key string) *Mutex {
|
func NewMutex(client *v3.Client, key string) *Mutex {
|
||||||
return &Mutex{client, key, nil}
|
return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mutex) Lock() (err error) {
|
func (m *Mutex) Lock() (err error) {
|
||||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||||
m.myKey, err = NewUniqueKey(m.client, m.key)
|
m.myKey, err = NewUniqueEphemeralKey(m.client, m.key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// find oldest element in waiters via revision of insertion
|
// find oldest element in waiters via revision of insertion
|
||||||
resp, err := NewRange(m.client, m.key).FirstRev()
|
resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -48,7 +52,8 @@ func (m *Mutex) Lock() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// otherwise myKey isn't lowest, so there must be a key prior to myKey
|
// otherwise myKey isn't lowest, so there must be a key prior to myKey
|
||||||
lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev()
|
opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1))
|
||||||
|
lastKey, err := m.kv.Get(m.ctx, m.key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -81,6 +86,6 @@ func (lm *lockerMutex) Unlock() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocker(client *clientv3.Client, key string) sync.Locker {
|
func NewLocker(client *v3.Client, key string) sync.Locker {
|
||||||
return &lockerMutex{NewMutex(client, key)}
|
return &lockerMutex{NewMutex(client, key)}
|
||||||
}
|
}
|
||||||
|
@ -17,25 +17,28 @@ package recipe
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
|
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
|
||||||
type PriorityQueue struct {
|
type PriorityQueue struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPriorityQueue creates an etcd priority queue.
|
// NewPriorityQueue creates an etcd priority queue.
|
||||||
func NewPriorityQueue(client *clientv3.Client, key string) *PriorityQueue {
|
func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
|
||||||
return &PriorityQueue{client, key + "/"}
|
return &PriorityQueue{client, v3.NewKV(client), context.TODO(), key + "/"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue puts a value into a queue with a given priority.
|
// Enqueue puts a value into a queue with a given priority.
|
||||||
func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
||||||
prefix := fmt.Sprintf("%s%05d", q.key, pr)
|
prefix := fmt.Sprintf("%s%05d", q.key, pr)
|
||||||
_, err := NewSequentialKV(q.client, prefix, val)
|
_, err := NewSequentialKV(q.kv, prefix, val)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,12 +46,12 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
|||||||
// queue is empty, Dequeue blocks until items are available.
|
// queue is empty, Dequeue blocks until items are available.
|
||||||
func (q *PriorityQueue) Dequeue() (string, error) {
|
func (q *PriorityQueue) Dequeue() (string, error) {
|
||||||
// TODO: fewer round trips by fetching more than one key
|
// TODO: fewer round trips by fetching more than one key
|
||||||
resp, err := NewRange(q.client, q.key).FirstKey()
|
resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
kv, err := claimFirstKey(q.client.KV, resp.Kvs)
|
kv, err := claimFirstKey(q.kv, resp.Kvs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if kv != nil {
|
} else if kv != nil {
|
||||||
@ -68,7 +71,7 @@ func (q *PriorityQueue) Dequeue() (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
|
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
|
@ -15,22 +15,26 @@
|
|||||||
package recipe
|
package recipe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Queue implements a multi-reader, multi-writer distributed queue.
|
// Queue implements a multi-reader, multi-writer distributed queue.
|
||||||
type Queue struct {
|
type Queue struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewQueue(client *clientv3.Client, keyPrefix string) *Queue {
|
func NewQueue(client *v3.Client, keyPrefix string) *Queue {
|
||||||
return &Queue{client, keyPrefix}
|
return &Queue{client, v3.NewKV(client), context.TODO(), keyPrefix}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queue) Enqueue(val string) error {
|
func (q *Queue) Enqueue(val string) error {
|
||||||
_, err := NewUniqueKV(q.client, q.keyPrefix, val, 0)
|
_, err := NewUniqueKV(q.kv, q.keyPrefix, val, 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,12 +42,12 @@ func (q *Queue) Enqueue(val string) error {
|
|||||||
// queue is empty, Dequeue blocks until elements are available.
|
// queue is empty, Dequeue blocks until elements are available.
|
||||||
func (q *Queue) Dequeue() (string, error) {
|
func (q *Queue) Dequeue() (string, error) {
|
||||||
// TODO: fewer round trips by fetching more than one key
|
// TODO: fewer round trips by fetching more than one key
|
||||||
resp, err := NewRange(q.client, q.keyPrefix).FirstRev()
|
resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
kv, err := claimFirstKey(q.client.KV, resp.Kvs)
|
kv, err := claimFirstKey(q.kv, resp.Kvs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if kv != nil {
|
} else if kv != nil {
|
||||||
@ -63,7 +67,7 @@ func (q *Queue) Dequeue() (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := deleteRevKey(q.client.KV, string(ev.Kv.Key), ev.Kv.ModRevision)
|
ok, err := deleteRevKey(q.kv, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
|
@ -15,94 +15,20 @@
|
|||||||
package recipe
|
package recipe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/clientv3"
|
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Range struct {
|
func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) }
|
||||||
kv pb.KVClient
|
func withLastCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortDescend) }
|
||||||
key []byte
|
func withFirstKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortAscend) }
|
||||||
rev int64
|
func withLastKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortDescend) }
|
||||||
keyEnd []byte
|
func withFirstRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortAscend) }
|
||||||
}
|
func withLastRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortDescend) }
|
||||||
|
|
||||||
func NewRange(client *clientv3.Client, key string) *Range {
|
// withTop gets the first key over the get's prefix given a sort order
|
||||||
return NewRangeRev(client, key, 0)
|
func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption {
|
||||||
}
|
return []v3.OpOption{
|
||||||
|
v3.WithPrefix(),
|
||||||
func NewRangeRev(client *clientv3.Client, key string, rev int64) *Range {
|
v3.WithSort(target, order),
|
||||||
return &Range{client.KV, []byte(key), rev, prefixEnd(key)}
|
v3.WithLimit(1)}
|
||||||
}
|
|
||||||
|
|
||||||
// Prefix performs a RangeRequest to get keys matching <key>*
|
|
||||||
func (r *Range) Prefix() (*pb.RangeResponse, error) {
|
|
||||||
return r.kv.Range(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.RangeRequest{
|
|
||||||
Key: prefixNext(string(r.key)),
|
|
||||||
RangeEnd: r.keyEnd,
|
|
||||||
Revision: r.rev})
|
|
||||||
}
|
|
||||||
|
|
||||||
// OpenInterval gets the keys in the set <key>* - <key>
|
|
||||||
func (r *Range) OpenInterval() (*pb.RangeResponse, error) {
|
|
||||||
return r.kv.Range(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) FirstKey() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) LastKey() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) FirstRev() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) LastRev() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) FirstCreate() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Range) LastCreate() (*pb.RangeResponse, error) {
|
|
||||||
return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD)
|
|
||||||
}
|
|
||||||
|
|
||||||
// topTarget gets the first key for a given sort order and target
|
|
||||||
func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) {
|
|
||||||
return r.kv.Range(
|
|
||||||
context.TODO(),
|
|
||||||
&pb.RangeRequest{
|
|
||||||
Key: r.key,
|
|
||||||
RangeEnd: r.keyEnd,
|
|
||||||
Limit: 1,
|
|
||||||
Revision: r.rev,
|
|
||||||
SortOrder: order,
|
|
||||||
SortTarget: target})
|
|
||||||
}
|
|
||||||
|
|
||||||
// prefixNext returns the first key possibly matched by <prefix>* - <prefix>
|
|
||||||
func prefixNext(prefix string) []byte {
|
|
||||||
return append([]byte(prefix), 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// prefixEnd returns the last key possibly matched by <prefix>*
|
|
||||||
func prefixEnd(prefix string) []byte {
|
|
||||||
keyEnd := []byte(prefix)
|
|
||||||
for i := len(keyEnd) - 1; i >= 0; i-- {
|
|
||||||
if keyEnd[i] < 0xff {
|
|
||||||
keyEnd[i] = keyEnd[i] + 1
|
|
||||||
keyEnd = keyEnd[:i+1]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return keyEnd
|
|
||||||
}
|
}
|
||||||
|
@ -15,23 +15,26 @@
|
|||||||
package recipe
|
package recipe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RWMutex struct {
|
type RWMutex struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
key string
|
key string
|
||||||
myKey *RemoteKV
|
myKey *EphemeralKV
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRWMutex(client *clientv3.Client, key string) *RWMutex {
|
func NewRWMutex(client *v3.Client, key string) *RWMutex {
|
||||||
return &RWMutex{client, key, nil}
|
return &RWMutex{client, v3.NewKV(client), context.TODO(), key, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rwm *RWMutex) RLock() error {
|
func (rwm *RWMutex) RLock() error {
|
||||||
// XXX: make reads ephemeral locks?
|
rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read")
|
||||||
rk, err := NewUniqueKey(rwm.client, rwm.key+"/read")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -39,7 +42,7 @@ func (rwm *RWMutex) RLock() error {
|
|||||||
|
|
||||||
// if there are nodes with "write-" and a lower
|
// if there are nodes with "write-" and a lower
|
||||||
// revision number than us we must wait
|
// revision number than us we must wait
|
||||||
resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev()
|
resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -51,21 +54,22 @@ func (rwm *RWMutex) RLock() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rwm *RWMutex) Lock() error {
|
func (rwm *RWMutex) Lock() error {
|
||||||
rk, err := NewUniqueKey(rwm.client, rwm.key+"/write")
|
rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rwm.myKey = rk
|
rwm.myKey = rk
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// any key of lower rev number blocks the write lock
|
// find any key of lower rev number blocks the write lock
|
||||||
resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev()
|
opts := append(withLastRev(), v3.WithRev(rk.Revision()-1))
|
||||||
|
resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(resp.Kvs) == 0 {
|
if len(resp.Kvs) == 0 {
|
||||||
// no matching for revision before myKey; acquired
|
// no matching for revision before myKey; acquired
|
||||||
return nil
|
break
|
||||||
}
|
}
|
||||||
if err := rwm.waitOnLowest(); err != nil {
|
if err := rwm.waitOnLowest(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -78,7 +82,8 @@ func (rwm *RWMutex) Lock() error {
|
|||||||
|
|
||||||
func (rwm *RWMutex) waitOnLowest() error {
|
func (rwm *RWMutex) waitOnLowest() error {
|
||||||
// must block; get key before ek for waiting
|
// must block; get key before ek for waiting
|
||||||
lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev()
|
opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
|
||||||
|
lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -16,13 +16,13 @@ package recipe
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// STM implements software transactional memory over etcd
|
// STM implements software transactional memory over etcd
|
||||||
type STM struct {
|
type STM struct {
|
||||||
client *clientv3.Client
|
client *v3.Client
|
||||||
|
kv v3.KV
|
||||||
// rset holds the read key's value and revision of read
|
// rset holds the read key's value and revision of read
|
||||||
rset map[string]*RemoteKV
|
rset map[string]*RemoteKV
|
||||||
// wset holds the write key and its value
|
// wset holds the write key and its value
|
||||||
@ -33,8 +33,8 @@ type STM struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewSTM creates new transaction loop for a given apply function.
|
// NewSTM creates new transaction loop for a given apply function.
|
||||||
func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
|
func NewSTM(client *v3.Client, apply func(*STM) error) <-chan error {
|
||||||
s := &STM{client: client, apply: apply}
|
s := &STM{client: client, kv: v3.NewKV(client), apply: apply}
|
||||||
errc := make(chan error, 1)
|
errc := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
@ -43,7 +43,8 @@ func NewSTM(client *clientv3.Client, apply func(*STM) error) <-chan error {
|
|||||||
if err = apply(s); err != nil || s.aborted {
|
if err = apply(s); err != nil || s.aborted {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if ok, err := s.commit(); ok || err != nil {
|
if ok, cerr := s.commit(); ok || cerr != nil {
|
||||||
|
err = cerr
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,7 +64,7 @@ func (s *STM) Get(key string) (string, error) {
|
|||||||
if rk, ok := s.rset[key]; ok {
|
if rk, ok := s.rset[key]; ok {
|
||||||
return rk.Value(), nil
|
return rk.Value(), nil
|
||||||
}
|
}
|
||||||
rk, err := GetRemoteKV(s.client, key)
|
rk, err := GetRemoteKV(s.kv, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -76,30 +77,21 @@ func (s *STM) Get(key string) (string, error) {
|
|||||||
func (s *STM) Put(key string, val string) { s.wset[key] = val }
|
func (s *STM) Put(key string, val string) { s.wset[key] = val }
|
||||||
|
|
||||||
// commit attempts to apply the txn's changes to the server.
|
// commit attempts to apply the txn's changes to the server.
|
||||||
func (s *STM) commit() (ok bool, err error) {
|
func (s *STM) commit() (ok bool, rr error) {
|
||||||
// read set must not change
|
// read set must not change
|
||||||
cmps := []*pb.Compare{}
|
cmps := make([]v3.Cmp, 0, len(s.rset))
|
||||||
for k, rk := range s.rset {
|
for k, rk := range s.rset {
|
||||||
// use < to support updating keys that don't exist yet
|
// use < to support updating keys that don't exist yet
|
||||||
cmp := &pb.Compare{
|
cmp := v3.Compare(v3.ModifiedRevision(k), "<", rk.Revision()+1)
|
||||||
Result: pb.Compare_LESS,
|
|
||||||
Target: pb.Compare_MOD,
|
|
||||||
Key: []byte(k),
|
|
||||||
TargetUnion: &pb.Compare_ModRevision{ModRevision: rk.Revision() + 1},
|
|
||||||
}
|
|
||||||
cmps = append(cmps, cmp)
|
cmps = append(cmps, cmp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply all writes
|
// apply all writes
|
||||||
puts := []*pb.RequestUnion{}
|
puts := make([]v3.Op, 0, len(s.wset))
|
||||||
for k, v := range s.wset {
|
for k, v := range s.wset {
|
||||||
puts = append(puts, &pb.RequestUnion{
|
puts = append(puts, v3.OpPut(k, v))
|
||||||
Request: &pb.RequestUnion_RequestPut{
|
|
||||||
RequestPut: &pb.PutRequest{
|
|
||||||
Key: []byte(k),
|
|
||||||
Value: []byte(v),
|
|
||||||
}}})
|
|
||||||
}
|
}
|
||||||
txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil})
|
txnresp, err := s.kv.Txn(context.TODO()).If(cmps...).Then(puts...).Commit()
|
||||||
return txnresp.Succeeded, err
|
return txnresp.Succeeded, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
v3 "github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/contrib/recipes"
|
"github.com/coreos/etcd/contrib/recipes"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,7 +31,7 @@ func TestSTMConflict(t *testing.T) {
|
|||||||
etcdc := clus.RandClient()
|
etcdc := clus.RandClient()
|
||||||
keys := make([]*recipe.RemoteKV, 5)
|
keys := make([]*recipe.RemoteKV, 5)
|
||||||
for i := 0; i < len(keys); i++ {
|
for i := 0; i < len(keys); i++ {
|
||||||
rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0)
|
rk, err := recipe.NewKV(v3.NewKV(etcdc), fmt.Sprintf("foo-%d", i), "100", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not make key (%v)", err)
|
t.Fatalf("could not make key (%v)", err)
|
||||||
}
|
}
|
||||||
@ -75,7 +76,7 @@ func TestSTMConflict(t *testing.T) {
|
|||||||
// ensure sum matches initial sum
|
// ensure sum matches initial sum
|
||||||
sum := 0
|
sum := 0
|
||||||
for _, oldRK := range keys {
|
for _, oldRK := range keys {
|
||||||
rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key())
|
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), oldRK.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
|
t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err)
|
||||||
}
|
}
|
||||||
@ -102,7 +103,7 @@ func TestSTMPutNewKey(t *testing.T) {
|
|||||||
t.Fatalf("error on stm txn (%v)", err)
|
t.Fatalf("error on stm txn (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rk, err := recipe.GetRemoteKV(etcdc, "foo")
|
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error fetching key (%v)", err)
|
t.Fatalf("error fetching key (%v)", err)
|
||||||
}
|
}
|
||||||
@ -128,7 +129,7 @@ func TestSTMAbort(t *testing.T) {
|
|||||||
t.Fatalf("error on stm txn (%v)", err)
|
t.Fatalf("error on stm txn (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rk, err := recipe.GetRemoteKV(etcdc, "foo")
|
rk, err := recipe.GetRemoteKV(v3.NewKV(etcdc), "foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error fetching key (%v)", err)
|
t.Fatalf("error fetching key (%v)", err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user