diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index c2c70105e..1e044d1c2 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -16,11 +16,13 @@ package etcdserver import ( "bytes" + "fmt" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" ) type RaftKV interface { @@ -103,16 +105,23 @@ func (s *EtcdServer) Watchable() dstorage.Watchable { return s.kv } +const ( + // noTxn is an invalid txn ID. + // To apply with independent Range, Put, Delete, you can pass noTxn + // to apply functions instead of a valid txn ID. + noTxn = -1 +) + func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { ar := &applyResult{} switch { case r.Range != nil: - ar.resp, ar.err = applyRange(s.kv, r.Range) + ar.resp, ar.err = applyRange(noTxn, s.kv, r.Range) case r.Put != nil: - ar.resp, ar.err = applyPut(s.kv, r.Put) + ar.resp, ar.err = applyPut(noTxn, s.kv, r.Put) case r.DeleteRange != nil: - ar.resp, ar.err = applyDeleteRange(s.kv, r.DeleteRange) + ar.resp, ar.err = applyDeleteRange(noTxn, s.kv, r.DeleteRange) case r.Txn != nil: ar.resp, ar.err = applyTxn(s.kv, r.Txn) case r.Compaction != nil: @@ -124,20 +133,45 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { return ar } -func applyPut(kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, error) { +func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, error) { resp := &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - rev := kv.Put(p.Key, p.Value) + var ( + rev int64 + err error + ) + if txnID != noTxn { + rev, err = kv.TxnPut(txnID, p.Key, p.Value) + if err != nil { + return nil, err + } + } else { + rev = kv.Put(p.Key, p.Value) + } resp.Header.Revision = rev return resp, nil } -func applyRange(kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0) - if err != nil { - return nil, err + + var ( + kvs []storagepb.KeyValue + rev int64 + err error + ) + + if txnID != noTxn { + kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, r.Limit, 0) + if err != nil { + return nil, err + } + } else { + kvs, rev, err = kv.Range(r.Key, r.RangeEnd, r.Limit, 0) + if err != nil { + return nil, err + } } resp.Header.Revision = rev @@ -147,10 +181,24 @@ func applyRange(kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { return resp, nil } -func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} - _, rev := kv.DeleteRange(dr.Key, dr.RangeEnd) + + var ( + rev int64 + err error + ) + + if txnID != noTxn { + _, rev, err = kv.TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) + if err != nil { + return nil, err + } + } else { + _, rev = kv.DeleteRange(dr.Key, dr.RangeEnd) + } + resp.Header.Revision = rev return resp, nil } @@ -158,9 +206,17 @@ func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRang func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) { var revision int64 + txnID := kv.TxnBegin() + defer func() { + err := kv.TxnEnd(txnID) + if err != nil { + panic(fmt.Sprint("unexpected error when closing txn", txnID)) + } + }() + ok := true for _, c := range rt.Compare { - if revision, ok = applyCompare(kv, c); !ok { + if revision, ok = applyCompare(txnID, kv, c); !ok { break } } @@ -173,10 +229,12 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) { } else { reqs = rt.Failure } + resps := make([]*pb.ResponseUnion, len(reqs)) for i := range reqs { - resps[i] = applyUnion(kv, reqs[i]) + resps[i] = applyUnion(txnID, kv, reqs[i]) } + if len(resps) != 0 { revision += 1 } @@ -201,22 +259,22 @@ func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) (*pb.Comp return resp, err } -func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { +func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { switch { case union.RequestRange != nil: - resp, err := applyRange(kv, union.RequestRange) + resp, err := applyRange(txnID, kv, union.RequestRange) if err != nil { panic("unexpected error during txn") } return &pb.ResponseUnion{ResponseRange: resp} case union.RequestPut != nil: - resp, err := applyPut(kv, union.RequestPut) + resp, err := applyPut(txnID, kv, union.RequestPut) if err != nil { panic("unexpected error during txn") } return &pb.ResponseUnion{ResponsePut: resp} case union.RequestDeleteRange != nil: - resp, err := applyDeleteRange(kv, union.RequestDeleteRange) + resp, err := applyDeleteRange(txnID, kv, union.RequestDeleteRange) if err != nil { panic("unexpected error during txn") } @@ -227,9 +285,20 @@ func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { } } -func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) { - ckvs, rev, err := kv.Range(c.Key, nil, 1, 0) +// applyCompare applies the compare request. +// applyCompare should only be called within a txn request and an valid txn ID must +// be presented. Or applyCompare panics. +// It returns the revision at which the comparison happens. If the comparison +// succeeds, the it returns true. Otherwise it returns false. +func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) { + if txnID == noTxn { + panic("applyCompare called with noTxn") + } + ckvs, rev, err := kv.TxnRange(txnID, c.Key, nil, 1, 0) if err != nil { + if err == dstorage.ErrTxnIDMismatch { + panic("unexpected txn ID mismatch error") + } return rev, false }