Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
fc00305a2e | |||
f322fe7f0d | |||
049fcd30ea | |||
1b702e79db | |||
b87190d9dc | |||
83b493f945 | |||
9b69cbd989 |
@ -782,3 +782,22 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
|
||||
<-donec
|
||||
clus.TakeClient(0)
|
||||
}
|
||||
|
||||
// TestWatchCancelDisconnected ensures canceling a watcher works when
|
||||
// its grpc stream is disconnected / reconnecting.
|
||||
func TestWatchCancelDisconnected(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
cli := clus.Client(0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// add more watches than can be resumed before the cancel
|
||||
wch := cli.Watch(ctx, "abc")
|
||||
clus.Members[0].Stop(t)
|
||||
cancel()
|
||||
select {
|
||||
case <-wch:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("took too long to cancel disconnected watcher")
|
||||
}
|
||||
}
|
||||
|
@ -215,14 +215,15 @@ func WithPrefix() OpOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithRange specifies the range of 'Get' or 'Delete' requests.
|
||||
// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
|
||||
// For example, 'Get' requests with 'WithRange(end)' returns
|
||||
// the keys in the range [key, end).
|
||||
// endKey must be lexicographically greater than start key.
|
||||
func WithRange(endKey string) OpOption {
|
||||
return func(op *Op) { op.end = []byte(endKey) }
|
||||
}
|
||||
|
||||
// WithFromKey specifies the range of 'Get' or 'Delete' requests
|
||||
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
|
||||
// to be equal or greater than the key in the argument.
|
||||
func WithFromKey() OpOption { return WithRange("\x00") }
|
||||
|
||||
|
@ -125,8 +125,6 @@ type watchGrpcStream struct {
|
||||
reqc chan *watchRequest
|
||||
// respc receives data from the watch client
|
||||
respc chan *pb.WatchResponse
|
||||
// stopc is sent to the main goroutine to stop all processing
|
||||
stopc chan struct{}
|
||||
// donec closes to broadcast shutdown
|
||||
donec chan struct{}
|
||||
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
||||
@ -204,7 +202,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
||||
|
||||
respc: make(chan *pb.WatchResponse),
|
||||
reqc: make(chan *watchRequest),
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
closingc: make(chan *watcherStream),
|
||||
@ -300,7 +297,7 @@ func (w *watcher) Close() (err error) {
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) Close() (err error) {
|
||||
close(w.stopc)
|
||||
w.cancel()
|
||||
<-w.donec
|
||||
select {
|
||||
case err = <-w.errc:
|
||||
@ -347,7 +344,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
||||
// close subscriber's channel
|
||||
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
|
||||
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
|
||||
} else {
|
||||
} else if ws.outc != nil {
|
||||
close(ws.outc)
|
||||
}
|
||||
if ws.id != -1 {
|
||||
@ -472,7 +469,7 @@ func (w *watchGrpcStream) run() {
|
||||
wc.Send(ws.initReq.toPB())
|
||||
}
|
||||
cancelSet = make(map[int64]struct{})
|
||||
case <-w.stopc:
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
case ws := <-w.closingc:
|
||||
w.closeSubstream(ws)
|
||||
@ -597,6 +594,8 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
||||
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
||||
}
|
||||
ws.initReq.rev = nextRev
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
case <-ws.initReq.ctx.Done():
|
||||
return
|
||||
case <-resumec:
|
||||
@ -608,34 +607,78 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
// connect to grpc stream
|
||||
// mark all substreams as resuming
|
||||
close(w.resumec)
|
||||
w.resumec = make(chan struct{})
|
||||
w.joinSubstreams()
|
||||
for _, ws := range w.substreams {
|
||||
ws.id = -1
|
||||
w.resuming = append(w.resuming, ws)
|
||||
}
|
||||
// strip out nils, if any
|
||||
var resuming []*watcherStream
|
||||
for _, ws := range w.resuming {
|
||||
if ws != nil {
|
||||
resuming = append(resuming, ws)
|
||||
}
|
||||
}
|
||||
w.resuming = resuming
|
||||
w.substreams = make(map[int64]*watcherStream)
|
||||
|
||||
// connect to grpc stream while accepting watcher cancelation
|
||||
stopc := make(chan struct{})
|
||||
donec := w.waitCancelSubstreams(stopc)
|
||||
wc, err := w.openWatchClient()
|
||||
close(stopc)
|
||||
<-donec
|
||||
|
||||
// serve all non-closing streams, even if there's a client error
|
||||
// so that the teardown path can shutdown the streams as expected.
|
||||
for _, ws := range w.resuming {
|
||||
if ws.closing {
|
||||
continue
|
||||
}
|
||||
ws.donec = make(chan struct{})
|
||||
go w.serveSubstream(ws, w.resumec)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, v3rpc.Error(err)
|
||||
}
|
||||
// mark all substreams as resuming
|
||||
if len(w.substreams)+len(w.resuming) > 0 {
|
||||
close(w.resumec)
|
||||
w.resumec = make(chan struct{})
|
||||
w.joinSubstreams()
|
||||
for _, ws := range w.substreams {
|
||||
ws.id = -1
|
||||
w.resuming = append(w.resuming, ws)
|
||||
}
|
||||
for _, ws := range w.resuming {
|
||||
if ws == nil || ws.closing {
|
||||
continue
|
||||
}
|
||||
ws.donec = make(chan struct{})
|
||||
go w.serveSubstream(ws, w.resumec)
|
||||
}
|
||||
}
|
||||
w.substreams = make(map[int64]*watcherStream)
|
||||
|
||||
// receive data from new grpc stream
|
||||
go w.serveWatchClient(wc)
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(w.resuming))
|
||||
donec := make(chan struct{})
|
||||
for i := range w.resuming {
|
||||
go func(ws *watcherStream) {
|
||||
defer wg.Done()
|
||||
if ws.closing {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ws.initReq.ctx.Done():
|
||||
// closed ws will be removed from resuming
|
||||
ws.closing = true
|
||||
close(ws.outc)
|
||||
ws.outc = nil
|
||||
go func() { w.closingc <- ws }()
|
||||
case <-stopc:
|
||||
}
|
||||
}(w.resuming[i])
|
||||
}
|
||||
go func() {
|
||||
defer close(donec)
|
||||
wg.Wait()
|
||||
}()
|
||||
return donec
|
||||
}
|
||||
|
||||
// joinSubstream waits for all substream goroutines to complete
|
||||
func (w *watchGrpcStream) joinSubstreams() {
|
||||
for _, ws := range w.substreams {
|
||||
@ -652,9 +695,9 @@ func (w *watchGrpcStream) joinSubstreams() {
|
||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
for {
|
||||
select {
|
||||
case <-w.stopc:
|
||||
case <-w.ctx.Done():
|
||||
if err == nil {
|
||||
return nil, context.Canceled
|
||||
return nil, w.ctx.Err()
|
||||
}
|
||||
return nil, err
|
||||
default:
|
||||
|
@ -221,7 +221,7 @@ OK
|
||||
|
||||
### WATCH [options] [key or prefix] [range_end]
|
||||
|
||||
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user.
|
||||
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user. If range_end is given, it must be lexicographically greater than key or "\x00".
|
||||
|
||||
#### Options
|
||||
|
||||
|
@ -348,6 +348,51 @@ func TestV3WatchFutureRevision(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchWrongRange tests wrong range does not create watchers.
|
||||
func TestV3WatchWrongRange(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
wAPI := toGRPC(clus.RandClient()).Watch
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, err := wAPI.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
key []byte
|
||||
end []byte
|
||||
canceled bool
|
||||
}{
|
||||
{[]byte("a"), []byte("a"), true}, // wrong range end
|
||||
{[]byte("b"), []byte("a"), true}, // wrong range end
|
||||
{[]byte("foo"), []byte{0}, false}, // watch request with 'WithFromKey'
|
||||
}
|
||||
for i, tt := range tests {
|
||||
if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: tt.key, RangeEnd: tt.end, StartRevision: 1}}}); err != nil {
|
||||
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||
}
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: wStream.Recv error: %v", i, err)
|
||||
}
|
||||
if !cresp.Created {
|
||||
t.Fatalf("#%d: create %v, want %v", i, cresp.Created, true)
|
||||
}
|
||||
if cresp.Canceled != tt.canceled {
|
||||
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
||||
}
|
||||
if tt.canceled && cresp.WatchId != -1 {
|
||||
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
|
||||
func TestV3WatchCancelSynced(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
@ -15,6 +15,7 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
@ -96,6 +97,12 @@ type watchStream struct {
|
||||
// Watch creates a new watcher in the stream and returns its WatchID.
|
||||
// TODO: return error if ws is closed?
|
||||
func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
|
||||
// prevent wrong range where key >= end lexicographically
|
||||
// watch request with 'WithFromKey' has empty-byte range end
|
||||
if len(end) != 0 && bytes.Compare(key, end) != -1 {
|
||||
return -1
|
||||
}
|
||||
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
if ws.closed {
|
||||
|
@ -153,6 +153,28 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
|
||||
// does not create watcher, which panics when canceling in range tree.
|
||||
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
||||
if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 {
|
||||
t.Fatalf("key == end range given; id expected -1, got %d", id)
|
||||
}
|
||||
if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 {
|
||||
t.Fatalf("key > end range given; id expected -1, got %d", id)
|
||||
}
|
||||
// watch request with 'WithFromKey' has empty-byte range end
|
||||
if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 {
|
||||
t.Fatalf("\x00 is range given; id expected 0, got %d", id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchDeleteRange(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.3.0"
|
||||
Version = "3.0.14"
|
||||
Version = "3.0.15"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
Reference in New Issue
Block a user