Compare commits

...

33 Commits

Author SHA1 Message Date
2d1e2e8e64 version: bump to v3.0.12 2016-10-07 15:14:25 -07:00
6412758177 v3rpc: remove redundant locks 2016-10-07 15:13:56 -07:00
836c8159f6 v3rpc: lock progress and prevKV map correctly 2016-10-07 15:13:12 -07:00
e406e6e8f4 etcdctl/ctlv3: add 'prev-kv' flag to watch command 2016-10-07 14:23:09 -07:00
2fa2c6284e clientv3: add 'prevKV' field to watch request 2016-10-07 14:22:58 -07:00
2862c4fa12 v3rpc: implement 'prev-kv' watch 2016-10-07 14:22:19 -07:00
6f89fbf8b5 etcdserver: use mvcc.WatchableKV for prev-kv watch 2016-10-07 14:22:00 -07:00
6ae7ec9a3f *: regenerate proto 2016-10-07 14:21:19 -07:00
4a35b1b20a etcdserverpb: add 'prev_kb' to WatchCreateRequest 2016-10-07 14:20:46 -07:00
c859c97ee2 mvccpb: add 'prev_kv' field 2016-10-07 14:19:59 -07:00
a091c629e1 version: bump to v3.0.11+git 2016-10-07 13:25:21 -07:00
96de94a584 version: bump to v3.0.11 2016-10-07 11:27:48 -07:00
e9cd8410d7 integration: add 'prevKV' to TestV3DeleteRange 2016-10-07 11:03:19 -07:00
e37ede1d2e etcdserver: handle 'PrevKV' 2016-10-07 11:00:48 -07:00
4420a29ac4 etcdctl/ctlv3: add 'prev-kv' flag 2016-10-07 10:56:06 -07:00
0544d4bfd0 clientv3: add WithPrevKV OpOption 2016-10-07 10:54:45 -07:00
fe7379f102 clientv3: add Op.prevKV 2016-10-07 10:51:01 -07:00
c76df5052b *: update proto to add 'prev_kv' 2016-10-07 10:47:47 -07:00
3299cad1c3 *: add put prevkv 2016-10-07 10:39:08 -07:00
d9ab018c49 integration: test a canceled watch won't return a closing error 2016-10-05 14:19:36 -07:00
e853451cd2 clientv3: only return closing error to watcher if context is not canceled
Fixes #6503
2016-10-05 14:19:32 -07:00
1becf9d2f5 clientv3: fix race on watch initial revision
The initial revision was being updated in the substream goroutine defer;
this was racing with the resume path fetching the initial revision when
the substream closes during resume. Instead, update the initial revision
whenever the substream processes a new watch response. Since the substream
cannot receive a watch response while it is resuming, the write to the
initial revision is ordered to always happen after the resume read.

Fixes #6586
2016-10-05 10:56:36 -07:00
1a712cf187 clientv3: make IsProgressNotify() false on compact event and closed channel
Fixes #6549
2016-10-04 15:13:02 -07:00
023f335f67 wal: set PageWriter offset in file encoder 2016-10-04 15:12:47 -07:00
bf0da78b63 pkg/ioutil: configure pageOffset in NewPageWriter 2016-10-04 15:12:46 -07:00
e8473850a2 integration: test canceling watchers when disconnected 2016-10-04 15:12:37 -07:00
b836d187fd clientv3: simplify watch synchronization
Was more complicated than it needed to be and didn't really work in the
first place. Restructured watcher registation to use a queue.
2016-10-04 15:12:18 -07:00
9b09229c4d version: bump to v3.0.10+git 2016-09-23 11:13:45 -07:00
546c0f7ed6 version: bump to v3.0.10 2016-09-23 10:49:03 -07:00
adbad1c9b5 ctlv3: close snapshot file before rename (Windows) 2016-09-23 09:11:02 -07:00
273b986751 clientv3: process closed watcherStreams in watcherGrpcStream run loop
Was racing with Watch() when closing the grpc stream on no watchers.

Fixes #6476
2016-09-21 15:52:20 -07:00
5b205729b9 rafthttp: add v3.0.0 to supported streams 2016-09-16 21:54:55 +09:00
fe900b09dd version: bump to v3.0.9+git 2016-09-15 15:10:23 -07:00
36 changed files with 1249 additions and 681 deletions

View File

@ -427,6 +427,7 @@ Empty field.
| ----- | ----------- | ---- |
| key | key is the first key to delete in the range. | bytes |
| range_end | range_end is the key following the last key to delete for the range [key, range_end). If range_end is not given, the range is defined to contain only the key argument. If range_end is '\0', the range is all keys greater than or equal to the key argument. | bytes |
| prev_kv | If prev_kv is set, etcd gets the previous key-value pairs before deleting it. The previous key-value pairs will be returned in the delte response. | bool |
@ -436,6 +437,7 @@ Empty field.
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| deleted | deleted is the number of keys deleted by the delete range request. | int64 |
| prev_kvs | if prev_kv is set in the request, the previous key-value pairs will be returned. | (slice of) mvccpb.KeyValue |
@ -591,6 +593,7 @@ Empty field.
| key | key is the key, in bytes, to put into the key-value store. | bytes |
| value | value is the value, in bytes, to associate with the key in the key-value store. | bytes |
| lease | lease is the lease ID to associate with the key in the key-value store. A lease value of 0 indicates no lease. | int64 |
| prev_kv | If prev_kv is set, etcd gets the previous key-value pair before changing it. The previous key-value pair will be returned in the put response. | bool |
@ -599,6 +602,7 @@ Empty field.
| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| prev_kv | if prev_kv is set in the request, the previous key-value pair will be returned. | mvccpb.KeyValue |
@ -735,6 +739,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| range_end | range_end is the end of the range [key, range_end) to watch. If range_end is not given, only the key argument is watched. If range_end is equal to '\0', all keys greater than or equal to the key argument are watched. | bytes |
| start_revision | start_revision is an optional revision to watch from (inclusive). No start_revision is "now". | int64 |
| progress_notify | progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events. It is useful when clients wish to recover a disconnected watcher starting from a recent known revision. The etcd server may decide how often it will send notifications based on current load. | bool |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
@ -767,6 +772,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| ----- | ----------- | ---- |
| type | type is the kind of event. If type is a PUT, it indicates new data has been stored to the key. If type is a DELETE, it indicates the key was deleted. | EventType |
| kv | kv holds the KeyValue for the event. A PUT event contains current kv pair. A PUT event with kv.Version=1 indicates the creation of a key. A DELETE/EXPIRE event contains the deleted key with its modification revision set to the revision of deletion. | KeyValue |
| prev_kv | prev_kv holds the key-value pair before the event happens. | KeyValue |

View File

@ -1474,6 +1474,11 @@
"format": "byte",
"description": "key is the first key to delete in the range."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, etcd gets the previous key-value pairs before deleting it.\nThe previous key-value pairs will be returned in the delte response."
},
"range_end": {
"type": "string",
"format": "byte",
@ -1491,6 +1496,13 @@
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kvs": {
"type": "array",
"items": {
"$ref": "#/definitions/mvccpbKeyValue"
},
"description": "if prev_kv is set in the request, the previous key-value pairs will be returned."
}
}
},
@ -1724,6 +1736,11 @@
"format": "int64",
"description": "lease is the lease ID to associate with the key in the key-value store. A lease\nvalue of 0 indicates no lease."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, etcd gets the previous key-value pair before changing it.\nThe previous key-value pair will be returned in the put response."
},
"value": {
"type": "string",
"format": "byte",
@ -1736,6 +1753,10 @@
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "if prev_kv is set in the request, the previous key-value pair will be returned."
}
}
},
@ -1988,6 +2009,11 @@
"format": "byte",
"description": "key is the key to register for watching."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, created watcher gets the previous KV before the event happens.\nIf the previous KV is already compacted, nothing will be returned."
},
"progress_notify": {
"type": "boolean",
"format": "boolean",
@ -2057,6 +2083,10 @@
"$ref": "#/definitions/mvccpbKeyValue",
"description": "kv holds the KeyValue for the event.\nA PUT event contains current kv pair.\nA PUT event with kv.Version=1 indicates the creation of a key.\nA DELETE/EXPIRE event contains the deleted key with\nits modification revision set to the revision of deletion."
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "prev_kv holds the key-value pair before the event happens."
},
"type": {
"$ref": "#/definitions/EventEventType",
"description": "type is the kind of event. If type is a PUT, it indicates\nnew data has been stored to the key. If type is a DELETE,\nit indicates the key was deleted."

View File

@ -21,9 +21,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -673,3 +673,112 @@ func TestWatchWithRequireLeader(t *testing.T) {
t.Fatalf("expected response, got closed channel")
}
}
// TestWatchOverlapContextCancel stresses the watcher stream teardown path by
// creating/canceling watchers to ensure that new watchers are not taken down
// by a torn down watch stream. The sort of race that's being detected:
// 1. create w1 using a cancelable ctx with %v as "ctx"
// 2. cancel ctx
// 3. watcher client begins tearing down watcher grpc stream since no more watchers
// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
// 4. watcher client finishes tearing down stream on "ctx"
// 5. w2 comes back canceled
func TestWatchOverlapContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {}
testWatchOverlapContextCancel(t, f)
}
func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections()
}
testWatchOverlapContextCancel(t, f)
}
func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
// each unique context "%v" has a unique grpc stream
n := 100
ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
for i := range ctxs {
// make "%v" unique
ctxs[i] = context.WithValue(context.TODO(), "key", i)
// limits the maximum number of outstanding watchers per stream
ctxc[i] = make(chan struct{}, 2)
}
// issue concurrent watches on "abc" with cancel
cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}
ch := make(chan struct{}, n)
for i := 0; i < n; i++ {
go func() {
defer func() { ch <- struct{}{} }()
idx := rand.Intn(len(ctxs))
ctx, cancel := context.WithCancel(ctxs[idx])
ctxc[idx] <- struct{}{}
wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
f(clus)
select {
case _, ok := <-wch:
if !ok {
t.Fatalf("unexpected closed channel %p", wch)
}
// may take a second or two to reestablish a watcher because of
// grpc backoff policies for disconnects
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for watch on %p", wch)
}
// randomize how cancel overlaps with watch creation
if rand.Intn(2) == 0 {
<-ctxc[idx]
cancel()
} else {
cancel()
<-ctxc[idx]
}
}()
}
// join on watches
for i := 0; i < n; i++ {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for completed watch")
}
}
}
// TestWatchCanelAndCloseClient ensures that canceling a watcher then immediately
// closing the client does not return a client closing error.
func TestWatchCancelAndCloseClient(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
ctx, cancel := context.WithCancel(context.Background())
wch := cli.Watch(ctx, "abc")
donec := make(chan struct{})
go func() {
defer close(donec)
select {
case wr, ok := <-wch:
if ok {
t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for closed channel")
}
}()
cancel()
if err := cli.Close(); err != nil {
t.Fatal(err)
}
<-donec
clus.TakeClient(0)
}

View File

@ -157,14 +157,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil

View File

@ -47,6 +47,9 @@ type Op struct {
// for range, watch
rev int64
// for watch, put, delete
prevKV bool
// progressNotify is for progress updates.
progressNotify bool
@ -73,10 +76,10 @@ func (op Op) toRequestOp() *pb.RequestOp {
}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
@ -271,3 +274,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true
}
}
// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
// nothing will be returned.
func WithPrevKV() OpOption {
return func(op *Op) {
op.prevKV = true
}
}

View File

@ -61,6 +61,9 @@ type WatchResponse struct {
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// created is used to indicate the creation of the watcher.
created bool
closeErr error
}
@ -89,7 +92,7 @@ func (wr *WatchResponse) Err() error {
// IsProgressNotify returns true if the WatchResponse is progress notification.
func (wr *WatchResponse) IsProgressNotify() bool {
return len(wr.Events) == 0 && !wr.Canceled
return len(wr.Events) == 0 && !wr.Canceled && !wr.created && wr.CompactRevision == 0 && wr.Header.Revision != 0
}
// watcher implements the Watcher interface
@ -102,6 +105,7 @@ type watcher struct {
streams map[string]*watchGrpcStream
}
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
@ -112,10 +116,10 @@ type watchGrpcStream struct {
ctxKey string
cancel context.CancelFunc
// mu protects the streams map
mu sync.RWMutex
// streams holds all active watchers
streams map[int64]*watcherStream
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
@ -127,8 +131,12 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
@ -140,6 +148,8 @@ type watchRequest struct {
rev int64
// progressNotify is for progress updates.
progressNotify bool
// get the previous key-value pair before the event happens
prevKV bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
@ -150,15 +160,18 @@ type watcherStream struct {
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan<- WatchResponse
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
id int64
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// lastRev is revision last successfully sent over outc
lastRev int64
// resumec indicates the stream must recover at a given revision
resumec chan int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
func NewWatcher(c *Client) Watcher {
@ -182,18 +195,20 @@ func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
streams: make(map[int64]*watcherStream),
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
go wgs.run()
return wgs
@ -203,14 +218,14 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{
ctx: ctx,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
retc: retc,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
@ -242,7 +257,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@ -255,7 +269,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
// receive channel
if ok {
select {
case ret := <-retc:
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
@ -286,12 +300,7 @@ func (w *watcher) Close() (err error) {
}
func (w *watchGrpcStream) Close() (err error) {
w.mu.Lock()
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
close(w.stopc)
<-w.donec
select {
case err = <-w.errc:
@ -300,67 +309,57 @@ func (w *watchGrpcStream) Close() (err error) {
return toErr(w.ctx, err)
}
func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
return
}
if resp.Canceled || resp.CompactRevision != 0 {
// a cancel at id creation time means the start revision has
// been compacted out of the store
ret := make(chan WatchResponse, 1)
ret <- WatchResponse{
Header: *resp.Header,
CompactRevision: resp.CompactRevision,
Canceled: true}
close(ret)
pendingReq.retc <- ret
return
}
ret := make(chan WatchResponse)
if resp.WatchId == -1 {
// failed; no channel
close(ret)
pendingReq.retc <- ret
return
}
ws := &watcherStream{
initReq: *pendingReq,
id: resp.WatchId,
outc: ret,
// buffered so unlikely to block on sending while holding mu
recvc: make(chan *WatchResponse, 4),
resumec: make(chan int64),
}
if pendingReq.rev == 0 {
// note the header revision so that a put following a current watcher
// disconnect will arrive on the watcher channel after reconnect
ws.initReq.rev = resp.Header.Revision
}
func (w *watcher) closeStream(wgs *watchGrpcStream) {
w.mu.Lock()
w.streams[ws.id] = ws
close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
}
w.mu.Unlock()
// pass back the subscriber channel for the watcher
pendingReq.retc <- ret
// send messages to subscriber
go w.serveStream(ws)
}
// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
if resp.WatchId == -1 {
// failed; no channel
close(ws.recvc)
return
}
ws.id = resp.WatchId
w.substreams[ws.id] = ws
}
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
select {
case ws.outc <- *resp:
case <-ws.initReq.ctx.Done():
case <-time.After(closeSendErrTimeout):
}
close(ws.outc)
delete(w.streams, ws.id)
w.mu.Unlock()
}
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
// send channel response in case stream was never established
select {
case ws.initReq.retc <- ws.outc:
default:
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
} else {
close(ws.outc)
}
if ws.id != -1 {
delete(w.substreams, ws.id)
return
}
for i := range w.resuming {
if w.resuming[i] == ws {
w.resuming[i] = nil
return
}
}
}
// run is the root of the goroutines for managing a watcher client
@ -368,66 +367,79 @@ func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
defer func() {
w.owner.mu.Lock()
w.closeErr = closeErr
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()
// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})
// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}
defer func() {
w.closeErr = closeErr
// shutdown substreams and resuming substreams
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
}
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
}
}
w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
w.closeSubstream(<-w.closingc)
}
w.owner.closeStream(w)
}()
// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
var pendingReq, failedReq *watchRequest
curReqC := w.reqc
cancelSet := make(map[int64]struct{})
for {
select {
// Watch() requested
case pendingReq = <-curReqC:
// no more watch requests until there's a response
curReqC = nil
if err := wc.Send(pendingReq.toPB()); err == nil {
// pendingReq now waits on w.respc
break
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbufffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
failedReq = pendingReq
// New events from the watch client
case pbresp := <-w.respc:
switch {
case pbresp.Created:
// response to pending req, try to add
w.addStream(pbresp, pendingReq)
pendingReq = nil
curReqC = w.reqc
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any
w.mu.Lock()
if ws, ok := w.streams[pbresp.WatchId]; ok {
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
delete(w.streams, ws.id)
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
closing[ws] = struct{}{}
}
default:
// dispatch to appropriate watch stream
@ -448,7 +460,6 @@ func (w *watchGrpcStream) run() {
wc.Send(req)
}
// watch client failed to recv; spawn another if possible
// TODO report watch client errors from errc?
case err := <-w.errc:
if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
@ -457,48 +468,58 @@ func (w *watchGrpcStream) run() {
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
curReqC = w.reqc
if pendingReq != nil {
failedReq = pendingReq
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
case <-stopc:
case <-w.stopc:
return
}
// send failed; queue for retry
if failedReq != nil {
go func(wr *watchRequest) {
select {
case w.reqc <- wr:
case <-wr.ctx.Done():
case <-w.donec:
}
}(pendingReq)
failedReq = nil
pendingReq = nil
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
return
}
}
}
}
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 {
if w.resuming[0] != nil {
return w.resuming[0]
}
w.resuming = w.resuming[1:len(w.resuming)]
}
return nil
}
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
ws, ok := w.substreams[pbresp.WatchId]
if !ok {
return false
}
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
if ok {
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Canceled: pbresp.Canceled}
ws.recvc <- wr
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
created: pbresp.Created,
Canceled: pbresp.Canceled,
}
return ok
select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
// serveWatchClient forwards messages from the grpc stream to run()
@ -520,134 +541,123 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
}
}
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
// serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic("created substream goroutine but substream is closing")
}
// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
closing := false
for !closing {
defer func() {
if !resuming {
ws.closing = true
}
close(ws.donec)
if !resuming {
w.closingc <- ws
}
}()
emptyWr := &WatchResponse{}
for {
curWr := emptyWr
outc := ws.outc
if len(wrs) > 0 {
curWr = wrs[0]
if len(ws.buf) > 0 && ws.buf[0].created {
select {
case ws.initReq.retc <- ws.outc:
default:
}
ws.buf = ws.buf[1:]
}
if len(ws.buf) > 0 {
curWr = ws.buf[0]
} else {
outc = nil
}
select {
case outc <- *curWr:
if wrs[0].Err() != nil {
closing = true
break
}
var newRev int64
if len(wrs[0].Events) > 0 {
newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
} else {
newRev = wrs[0].Header.Revision
}
if newRev != ws.lastRev {
ws.lastRev = newRev
}
wrs[0] = nil
wrs = wrs[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeStream
if ws.buf[0].Err() != nil {
return
}
// resume up to last seen event if disconnected
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
if wr.Events[i].Kv.ModRevision > ws.lastRev {
wr.Events = wr.Events[i:]
break
}
}
// only forward new events
if wr.Events[0].Kv.ModRevision == ws.lastRev {
break
}
ws.buf[0] = nil
ws.buf = ws.buf[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeSubstream
return
}
resuming = false
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:
wrs = nil
resuming = true
if resumeRev == -1 {
// pause serving stream while resume gets set up
break
// TODO pause channel if buffer gets too large
ws.buf = append(ws.buf, wr)
nextRev = wr.Header.Revision
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
if resumeRev != ws.lastRev {
panic("unexpected resume revision")
}
case <-w.donec:
closing = true
closeErr = w.closeErr
ws.initReq.rev = nextRev
case <-ws.initReq.ctx.Done():
closing = true
return
case <-resumec:
resuming = true
return
}
}
// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
return nil, rerr
// connect to grpc stream
wc, err := w.openWatchClient()
if err != nil {
return nil, v3rpc.Error(err)
}
go w.serveWatchClient(ws)
return ws, nil
}
// resume creates a new WatchClient with all current watchers reestablished
func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
for {
if ws, err = w.openWatchClient(); err != nil {
break
} else if err = w.resumeWatchers(ws); err == nil {
break
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}
// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
<-ws.donec
}
for _, ws := range w.resuming {
if ws != nil {
<-ws.donec
}
}
return ws, v3rpc.Error(err)
}
// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
w.mu.Lock()
stopc := w.stopc
w.mu.Unlock()
if stopc == nil {
select {
case <-w.stopc:
if err == nil {
err = context.Canceled
return nil, context.Canceled
}
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break
@ -659,63 +669,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return ws, nil
}
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams = append(streams, ws)
}
w.mu.RUnlock()
for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()
// pause serveStream
ws.resumec <- -1
// reconstruct watcher from initial request
if ws.lastRev != 0 {
ws.initReq.rev = ws.lastRev
}
if err := wc.Send(ws.initReq.toPB()); err != nil {
return err
}
// wait for request ack
resp, err := wc.Recv()
if err != nil {
return err
} else if len(resp.Events) != 0 || !resp.Created {
return fmt.Errorf("watcher: unexpected response (%+v)", resp)
}
// id may be different since new remote watcher; update map
w.mu.Lock()
delete(w.streams, ws.id)
ws.id = resp.WatchId
w.streams[ws.id] = ws
w.mu.Unlock()
// unpause serveStream
ws.resumec <- ws.lastRev
}
return nil
}
// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
@ -723,6 +676,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
PrevKv: wr.prevKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}

View File

@ -231,6 +231,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `
- prefix -- watch on a prefix if prefix is set.
- prev-kv -- get the previous key-value pair before the event happens.
- rev -- the revision to start watching. Specifying a revision is useful for observing past events.
#### Input Format
@ -245,7 +247,7 @@ watch [options] <key or prefix>\n
##### Simple reply
- \<event\>\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- \<event\>[\n\<old_key\>\n\<old_value\>]\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- Additional error string if WATCH failed. Exit code is non-zero.

View File

@ -23,6 +23,7 @@ import (
var (
delPrefix bool
delPrevKV bool
)
// NewDelCommand returns the cobra command for "del".
@ -34,6 +35,7 @@ func NewDelCommand() *cobra.Command {
}
cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix")
cmd.Flags().BoolVar(&delPrevKV, "prev-kv", false, "return deleted key-value pairs")
return cmd
}
@ -65,6 +67,9 @@ func getDelOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
if delPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if delPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
return key, opts
}

View File

@ -108,6 +108,9 @@ type simplePrinter struct {
func (s *simplePrinter) Del(resp v3.DeleteResponse) {
fmt.Println(resp.Deleted)
for _, kv := range resp.PrevKvs {
printKV(s.isHex, kv)
}
}
func (s *simplePrinter) Get(resp v3.GetResponse) {
@ -116,7 +119,12 @@ func (s *simplePrinter) Get(resp v3.GetResponse) {
}
}
func (s *simplePrinter) Put(r v3.PutResponse) { fmt.Println("OK") }
func (s *simplePrinter) Put(r v3.PutResponse) {
fmt.Println("OK")
if r.PrevKv != nil {
printKV(s.isHex, r.PrevKv)
}
}
func (s *simplePrinter) Txn(resp v3.TxnResponse) {
if resp.Succeeded {
@ -143,6 +151,9 @@ func (s *simplePrinter) Txn(resp v3.TxnResponse) {
func (s *simplePrinter) Watch(resp v3.WatchResponse) {
for _, e := range resp.Events {
fmt.Println(e.Type)
if e.PrevKv != nil {
printKV(s.isHex, e.PrevKv)
}
printKV(s.isHex, e.Kv)
}
}

View File

@ -24,7 +24,8 @@ import (
)
var (
leaseStr string
leaseStr string
putPrevKV bool
)
// NewPutCommand returns the cobra command for "put".
@ -49,6 +50,7 @@ will store the content of the file to <key>.
Run: putCommandFunc,
}
cmd.Flags().StringVar(&leaseStr, "lease", "0", "lease ID (in hexadecimal) to attach to the key")
cmd.Flags().BoolVar(&putPrevKV, "prev-kv", false, "return changed key-value pairs")
return cmd
}
@ -85,6 +87,9 @@ func getPutOp(cmd *cobra.Command, args []string) (string, string, []clientv3.OpO
if id != 0 {
opts = append(opts, clientv3.WithLease(clientv3.LeaseID(id)))
}
if putPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
return key, value, opts
}

View File

@ -115,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
partpath := path + ".part"
f, err := os.Create(partpath)
defer f.Close()
if err != nil {
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
ExitWithError(ExitBadArgs, exiterr)
@ -134,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
fileutil.Fsync(f)
f.Close()
if rerr := os.Rename(partpath, path); rerr != nil {
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
ExitWithError(ExitIO, exiterr)

View File

@ -29,6 +29,7 @@ var (
watchRev int64
watchPrefix bool
watchInteractive bool
watchPrevKey bool
)
// NewWatchCommand returns the cobra command for "watch".
@ -42,6 +43,7 @@ func NewWatchCommand() *cobra.Command {
cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
return cmd
}
@ -119,6 +121,9 @@ func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error)
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if watchPrevKey {
opts = append(opts, clientv3.WithPrevKV())
}
return c.Watch(context.TODO(), key, opts...), nil
}

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,6 +82,8 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
@ -91,6 +93,7 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send
// progress to.
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// closec indicates the stream is closed.
closec chan struct{}
@ -101,14 +104,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
@ -170,9 +177,14 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify {
if id != -1 {
sws.mu.Lock()
sws.progress[id] = true
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
sws.mu.Unlock()
}
wr := &pb.WatchResponse{
@ -198,6 +210,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
@ -244,8 +257,19 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.Lock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.Unlock()
for i := range evs {
events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
wr := &pb.WatchResponse{

View File

@ -159,6 +159,22 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev int64
err error
)
var rr *mvcc.RangeResult
if p.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
@ -174,6 +190,9 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
}
resp.Header.Revision = rev
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, nil
}
@ -191,6 +210,21 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
dr.RangeEnd = []byte{}
}
var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil {
@ -201,6 +235,11 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
}
resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
resp.Header.Revision = rev
return resp, nil
}

View File

@ -56,6 +56,9 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
if !aa.as.IsPutPermitted(aa.user, r.Key) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, nil) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.Put(txnID, r)
}
@ -70,6 +73,9 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.DeleteRange(txnID, r)
}
@ -99,7 +105,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) bool {
continue
}
if !aa.as.IsDeleteRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
if tv.RequestDeleteRange.PrevKv && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
return false
}
}

View File

@ -102,9 +102,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -10,9 +10,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

File diff suppressed because it is too large Load Diff

View File

@ -396,10 +396,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease.
int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
}
message PutResponse {
ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
}
message DeleteRangeRequest {
@ -409,12 +415,17 @@ message DeleteRangeRequest {
// If range_end is not given, the range is defined to contain only the key argument.
// If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
bool prev_kv = 3;
}
message DeleteRangeResponse {
ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request.
int64 deleted = 2;
// if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue prev_kvs = 3;
}
message RequestOp {
@ -563,6 +574,9 @@ message WatchCreateRequest {
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
}
message WatchCancelRequest {

View File

@ -551,4 +551,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
}
// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() }
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

View File

@ -379,6 +379,7 @@ func TestV3DeleteRange(t *testing.T) {
keySet []string
begin string
end string
prevKV bool
wantSet [][]byte
deleted int64
@ -386,39 +387,45 @@ func TestV3DeleteRange(t *testing.T) {
// delete middle
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fop",
"foo/", "fop", false,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
// no delete
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "foo/",
"foo/", "foo/", false,
[][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
},
// delete first
{
[]string{"foo", "foo/abc", "fop"},
"fo", "fop",
"fo", "fop", false,
[][]byte{[]byte("fop")}, 2,
},
// delete tail
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fos",
"foo/", "fos", false,
[][]byte{[]byte("foo")}, 2,
},
// delete exact
{
[]string{"foo", "foo/abc", "fop"},
"foo/abc", "",
"foo/abc", "", false,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
// delete none, [x,x)
{
[]string{"foo"},
"foo", "foo",
"foo", "foo", false,
[][]byte{[]byte("foo")}, 0,
},
// delete middle with preserveKVs set
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fop", true,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
}
for i, tt := range tests {
@ -436,7 +443,9 @@ func TestV3DeleteRange(t *testing.T) {
dreq := &pb.DeleteRangeRequest{
Key: []byte(tt.begin),
RangeEnd: []byte(tt.end)}
RangeEnd: []byte(tt.end),
PrevKv: tt.prevKV,
}
dresp, err := kvc.DeleteRange(context.TODO(), dreq)
if err != nil {
t.Fatalf("couldn't delete range on test %d (%v)", i, err)
@ -444,6 +453,11 @@ func TestV3DeleteRange(t *testing.T) {
if tt.deleted != dresp.Deleted {
t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
}
if tt.prevKV {
if len(dresp.PrevKvs) != int(dresp.Deleted) {
t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
}
}
rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
rresp, err := kvc.Range(context.TODO(), rreq)

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -89,6 +89,8 @@ type Event struct {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
func (m *Event) Reset() { *m = Event{} }
@ -181,6 +183,16 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
}
i += n1
}
if m.PrevKv != nil {
data[i] = 0x1a
i++
i = encodeVarintKv(data, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n2
}
return i, nil
}
@ -247,6 +259,10 @@ func (m *Event) Size() (n int) {
l = m.Kv.Size()
n += 1 + l + sovKv(uint64(l))
}
if m.PrevKv != nil {
l = m.PrevKv.Size()
n += 1 + l + sovKv(uint64(l))
}
return n
}
@ -532,6 +548,39 @@ func (m *Event) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PrevKv", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowKv
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthKv
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.PrevKv == nil {
m.PrevKv = &KeyValue{}
}
if err := m.PrevKv.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:])
@ -659,23 +708,24 @@ var (
)
var fileDescriptorKv = []byte{
// 285 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x90, 0x41, 0x4a, 0xc3, 0x40,
0x14, 0x86, 0x33, 0x4d, 0x9b, 0xd6, 0xd7, 0x52, 0xc3, 0x50, 0x70, 0x70, 0x31, 0xc4, 0x6e, 0x2c,
0x08, 0x11, 0xea, 0x0d, 0xc4, 0xac, 0x74, 0x21, 0x21, 0xba, 0x95, 0x34, 0x7d, 0x94, 0x92, 0xa6,
0x13, 0xd2, 0x38, 0x98, 0x9b, 0x78, 0x0a, 0xcf, 0xd1, 0x65, 0x8f, 0x60, 0xe3, 0x45, 0x24, 0x6f,
0x4c, 0xdd, 0x0c, 0xef, 0xff, 0xff, 0x6f, 0x98, 0xff, 0x0d, 0x0c, 0x52, 0xed, 0xe7, 0x85, 0x2a,
0x15, 0x77, 0x32, 0x9d, 0x24, 0xf9, 0xe2, 0x72, 0xb2, 0x52, 0x2b, 0x45, 0xd6, 0x6d, 0x33, 0x99,
0x74, 0xfa, 0xc5, 0x60, 0xf0, 0x88, 0xd5, 0x6b, 0xbc, 0x79, 0x47, 0xee, 0x82, 0x9d, 0x62, 0x25,
0x98, 0xc7, 0x66, 0xa3, 0xb0, 0x19, 0xf9, 0x35, 0x9c, 0x27, 0x05, 0xc6, 0x25, 0xbe, 0x15, 0xa8,
0xd7, 0xbb, 0xb5, 0xda, 0x8a, 0x8e, 0xc7, 0x66, 0x76, 0x38, 0x36, 0x76, 0xf8, 0xe7, 0xf2, 0x2b,
0x18, 0x65, 0x6a, 0xf9, 0x4f, 0xd9, 0x44, 0x0d, 0x33, 0xb5, 0x3c, 0x21, 0x02, 0xfa, 0x1a, 0x0b,
0x4a, 0xbb, 0x94, 0xb6, 0x92, 0x4f, 0xa0, 0xa7, 0x9b, 0x02, 0xa2, 0x47, 0x2f, 0x1b, 0xd1, 0xb8,
0x1b, 0x8c, 0x77, 0x28, 0x1c, 0xa2, 0x8d, 0x98, 0x7e, 0x40, 0x2f, 0xd0, 0xb8, 0x2d, 0xf9, 0x0d,
0x74, 0xcb, 0x2a, 0x47, 0x6a, 0x3b, 0x9e, 0x5f, 0xf8, 0x66, 0x4d, 0x9f, 0x42, 0x73, 0x46, 0x55,
0x8e, 0x21, 0x41, 0xdc, 0x83, 0x4e, 0xaa, 0xa9, 0xfa, 0x70, 0xee, 0xb6, 0x68, 0xbb, 0x77, 0xd8,
0x49, 0xf5, 0xd4, 0x83, 0xb3, 0xd3, 0x25, 0xde, 0x07, 0xfb, 0xf9, 0x25, 0x72, 0x2d, 0x0e, 0xe0,
0x3c, 0x04, 0x4f, 0x41, 0x14, 0xb8, 0xec, 0x5e, 0xec, 0x8f, 0xd2, 0x3a, 0x1c, 0xa5, 0xb5, 0xaf,
0x25, 0x3b, 0xd4, 0x92, 0x7d, 0xd7, 0x92, 0x7d, 0xfe, 0x48, 0x6b, 0xe1, 0xd0, 0x5f, 0xde, 0xfd,
0x06, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x21, 0x8f, 0x2c, 0x75, 0x01, 0x00, 0x00,
// 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,
0xc6, 0xe2, 0xc6, 0xcd, 0xe4, 0xfd, 0xff, 0xff, 0x65, 0xe6, 0x7f, 0x03, 0x9d, 0x58, 0x8f, 0xd3,
0x4c, 0xe5, 0x8a, 0x3b, 0x89, 0x8e, 0xa2, 0x74, 0x71, 0x39, 0x58, 0xa9, 0x95, 0x22, 0xeb, 0xae,
0x9a, 0x4c, 0x3a, 0xfc, 0x64, 0xd0, 0x99, 0x62, 0xf1, 0x1a, 0x6e, 0xde, 0x90, 0xbb, 0x60, 0xc7,
0x58, 0x08, 0xe6, 0xb1, 0x51, 0x2f, 0xa8, 0x46, 0x7e, 0x0d, 0xe7, 0x51, 0x86, 0x61, 0x8e, 0xf3,
0x0c, 0xf5, 0x7a, 0xb7, 0x56, 0x5b, 0xd1, 0xf0, 0xd8, 0xc8, 0x0e, 0xfa, 0xc6, 0x0e, 0x7e, 0x5d,
0x7e, 0x05, 0xbd, 0x44, 0x2d, 0xff, 0x28, 0x9b, 0xa8, 0x6e, 0xa2, 0x96, 0x27, 0x44, 0x40, 0x5b,
0x63, 0x46, 0x69, 0x93, 0xd2, 0x5a, 0xf2, 0x01, 0xb4, 0x74, 0x55, 0x40, 0xb4, 0xe8, 0x65, 0x23,
0x2a, 0x77, 0x83, 0xe1, 0x0e, 0x85, 0x43, 0xb4, 0x11, 0xc3, 0x0f, 0x06, 0x2d, 0x5f, 0xe3, 0x36,
0xe7, 0xb7, 0xd0, 0xcc, 0x8b, 0x14, 0xa9, 0x6e, 0x7f, 0x72, 0x31, 0x36, 0x7b, 0x8e, 0x29, 0x34,
0xe7, 0xac, 0x48, 0x31, 0x20, 0x88, 0x7b, 0xd0, 0x88, 0x35, 0x75, 0xef, 0x4e, 0xdc, 0x1a, 0xad,
0x17, 0x0f, 0x1a, 0xb1, 0xe6, 0x37, 0xd0, 0x4e, 0x33, 0xd4, 0xf3, 0x58, 0x53, 0xf9, 0xff, 0x30,
0xa7, 0x02, 0xa6, 0x7a, 0xe8, 0xc1, 0xd9, 0xe9, 0x7e, 0xde, 0x06, 0xfb, 0xf9, 0x65, 0xe6, 0x5a,
0x1c, 0xc0, 0x79, 0xf4, 0x9f, 0xfc, 0x99, 0xef, 0xb2, 0x07, 0xb1, 0x3f, 0x4a, 0xeb, 0x70, 0x94,
0xd6, 0xbe, 0x94, 0xec, 0x50, 0x4a, 0xf6, 0x55, 0x4a, 0xf6, 0xfe, 0x2d, 0xad, 0x85, 0x43, 0xff,
0x7e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x45, 0x92, 0x5d, 0xa1, 0x01, 0x00, 0x00,
}

View File

@ -43,4 +43,6 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -38,9 +38,12 @@ type PageWriter struct {
bufWatermarkBytes int
}
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
// NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
// to write per page. pageOffset is the starting offset of io.Writer.
func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
return &PageWriter{
w: w,
pageOffset: pageOffset,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,

View File

@ -25,7 +25,7 @@ func TestPageWriterRandom(t *testing.T) {
pageBytes := 128
buf := make([]byte, 4*defaultBufferBytes)
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
n := 0
for i := 0; i < 4096; i++ {
c, err := w.Write(buf[:rand.Intn(len(buf))])
@ -51,7 +51,7 @@ func TestPageWriterPartialSlack(t *testing.T) {
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
// put writer in non-zero page offset
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
@ -82,6 +82,35 @@ func TestPageWriterPartialSlack(t *testing.T) {
}
}
// TestPageWriterOffset tests if page writer correctly repositions when offset is given.
func TestPageWriterOffset(t *testing.T) {
defaultBufferBytes = 1024
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes, 0)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 64 {
t.Fatalf("w.pageOffset expected 64, got %d", w.pageOffset)
}
w = NewPageWriter(cw, w.pageOffset, pageBytes)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 0 {
t.Fatalf("w.pageOffset expected 0, got %d", w.pageOffset)
}
}
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
type checkPageWriter struct {
pageBytes int

View File

@ -25,9 +25,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -49,6 +49,7 @@ var (
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.9"
Version = "3.0.12"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -18,6 +18,7 @@ import (
"encoding/binary"
"hash"
"io"
"os"
"sync"
"github.com/coreos/etcd/pkg/crc"
@ -39,9 +40,9 @@ type encoder struct {
uint64buf []byte
}
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
return &encoder{
bw: ioutil.NewPageWriter(w, walPageBytes),
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
}
}
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
return newEncoder(f, prevCrc, int(offset)), nil
}
func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) {
typ := int64(0xABCD)
d := []byte("Hello world!")
buf := new(bytes.Buffer)
e := newEncoder(buf, 0)
e := newEncoder(buf, 0, 0)
e.encode(&walpb.Record{Type: typ, Data: d})
e.flush()
decoder := newDecoder(ioutil.NopCloser(buf))

View File

@ -120,7 +120,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
w := &WAL{
dir: dirpath,
metadata: metadata,
encoder: newEncoder(f, 0),
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
@ -341,7 +344,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
if err != nil {
return
}
}
w.decoder = nil
@ -375,7 +381,10 @@ func (w *WAL) cut() error {
// update writer and save the previous crc
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil {
return err
}
@ -414,7 +423,10 @@ func (w *WAL) cut() error {
w.locks[len(w.locks)-1] = newTail
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil

View File

@ -61,7 +61,7 @@ func TestNew(t *testing.T) {
}
var wb bytes.Buffer
e := newEncoder(&wb, 0)
e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
if err != nil {
t.Fatalf("err = %v, want nil", err)
@ -465,7 +465,7 @@ func TestSaveEmpty(t *testing.T) {
var buf bytes.Buffer
var est raftpb.HardState
w := WAL{
encoder: newEncoder(&buf, 0),
encoder: newEncoder(&buf, 0, 0),
}
if err := w.saveState(&est); err != nil {
t.Errorf("err = %v, want nil", err)

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal