Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
fc00305a2e | |||
f322fe7f0d | |||
049fcd30ea | |||
1b702e79db | |||
b87190d9dc | |||
83b493f945 | |||
9b69cbd989 | |||
8a37349097 | |||
9a0e4dfe4f | |||
f60469af16 | |||
932370d8ca |
@ -782,3 +782,22 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
|
|||||||
<-donec
|
<-donec
|
||||||
clus.TakeClient(0)
|
clus.TakeClient(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWatchCancelDisconnected ensures canceling a watcher works when
|
||||||
|
// its grpc stream is disconnected / reconnecting.
|
||||||
|
func TestWatchCancelDisconnected(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
cli := clus.Client(0)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// add more watches than can be resumed before the cancel
|
||||||
|
wch := cli.Watch(ctx, "abc")
|
||||||
|
clus.Members[0].Stop(t)
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-wch:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("took too long to cancel disconnected watcher")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -215,14 +215,15 @@ func WithPrefix() OpOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRange specifies the range of 'Get' or 'Delete' requests.
|
// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
|
||||||
// For example, 'Get' requests with 'WithRange(end)' returns
|
// For example, 'Get' requests with 'WithRange(end)' returns
|
||||||
// the keys in the range [key, end).
|
// the keys in the range [key, end).
|
||||||
|
// endKey must be lexicographically greater than start key.
|
||||||
func WithRange(endKey string) OpOption {
|
func WithRange(endKey string) OpOption {
|
||||||
return func(op *Op) { op.end = []byte(endKey) }
|
return func(op *Op) { op.end = []byte(endKey) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithFromKey specifies the range of 'Get' or 'Delete' requests
|
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
|
||||||
// to be equal or greater than the key in the argument.
|
// to be equal or greater than the key in the argument.
|
||||||
func WithFromKey() OpOption { return WithRange("\x00") }
|
func WithFromKey() OpOption { return WithRange("\x00") }
|
||||||
|
|
||||||
|
@ -125,8 +125,6 @@ type watchGrpcStream struct {
|
|||||||
reqc chan *watchRequest
|
reqc chan *watchRequest
|
||||||
// respc receives data from the watch client
|
// respc receives data from the watch client
|
||||||
respc chan *pb.WatchResponse
|
respc chan *pb.WatchResponse
|
||||||
// stopc is sent to the main goroutine to stop all processing
|
|
||||||
stopc chan struct{}
|
|
||||||
// donec closes to broadcast shutdown
|
// donec closes to broadcast shutdown
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
||||||
@ -204,7 +202,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
|||||||
|
|
||||||
respc: make(chan *pb.WatchResponse),
|
respc: make(chan *pb.WatchResponse),
|
||||||
reqc: make(chan *watchRequest),
|
reqc: make(chan *watchRequest),
|
||||||
stopc: make(chan struct{}),
|
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
errc: make(chan error, 1),
|
errc: make(chan error, 1),
|
||||||
closingc: make(chan *watcherStream),
|
closingc: make(chan *watcherStream),
|
||||||
@ -300,7 +297,7 @@ func (w *watcher) Close() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) Close() (err error) {
|
func (w *watchGrpcStream) Close() (err error) {
|
||||||
close(w.stopc)
|
w.cancel()
|
||||||
<-w.donec
|
<-w.donec
|
||||||
select {
|
select {
|
||||||
case err = <-w.errc:
|
case err = <-w.errc:
|
||||||
@ -347,7 +344,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
|||||||
// close subscriber's channel
|
// close subscriber's channel
|
||||||
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
|
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
|
||||||
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
|
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
|
||||||
} else {
|
} else if ws.outc != nil {
|
||||||
close(ws.outc)
|
close(ws.outc)
|
||||||
}
|
}
|
||||||
if ws.id != -1 {
|
if ws.id != -1 {
|
||||||
@ -472,7 +469,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
wc.Send(ws.initReq.toPB())
|
wc.Send(ws.initReq.toPB())
|
||||||
}
|
}
|
||||||
cancelSet = make(map[int64]struct{})
|
cancelSet = make(map[int64]struct{})
|
||||||
case <-w.stopc:
|
case <-w.ctx.Done():
|
||||||
return
|
return
|
||||||
case ws := <-w.closingc:
|
case ws := <-w.closingc:
|
||||||
w.closeSubstream(ws)
|
w.closeSubstream(ws)
|
||||||
@ -597,6 +594,8 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
||||||
}
|
}
|
||||||
ws.initReq.rev = nextRev
|
ws.initReq.rev = nextRev
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
return
|
||||||
case <-ws.initReq.ctx.Done():
|
case <-ws.initReq.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-resumec:
|
case <-resumec:
|
||||||
@ -608,13 +607,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||||
// connect to grpc stream
|
|
||||||
wc, err := w.openWatchClient()
|
|
||||||
if err != nil {
|
|
||||||
return nil, v3rpc.Error(err)
|
|
||||||
}
|
|
||||||
// mark all substreams as resuming
|
// mark all substreams as resuming
|
||||||
if len(w.substreams)+len(w.resuming) > 0 {
|
|
||||||
close(w.resumec)
|
close(w.resumec)
|
||||||
w.resumec = make(chan struct{})
|
w.resumec = make(chan struct{})
|
||||||
w.joinSubstreams()
|
w.joinSubstreams()
|
||||||
@ -622,20 +615,70 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
|||||||
ws.id = -1
|
ws.id = -1
|
||||||
w.resuming = append(w.resuming, ws)
|
w.resuming = append(w.resuming, ws)
|
||||||
}
|
}
|
||||||
|
// strip out nils, if any
|
||||||
|
var resuming []*watcherStream
|
||||||
for _, ws := range w.resuming {
|
for _, ws := range w.resuming {
|
||||||
if ws == nil || ws.closing {
|
if ws != nil {
|
||||||
|
resuming = append(resuming, ws)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.resuming = resuming
|
||||||
|
w.substreams = make(map[int64]*watcherStream)
|
||||||
|
|
||||||
|
// connect to grpc stream while accepting watcher cancelation
|
||||||
|
stopc := make(chan struct{})
|
||||||
|
donec := w.waitCancelSubstreams(stopc)
|
||||||
|
wc, err := w.openWatchClient()
|
||||||
|
close(stopc)
|
||||||
|
<-donec
|
||||||
|
|
||||||
|
// serve all non-closing streams, even if there's a client error
|
||||||
|
// so that the teardown path can shutdown the streams as expected.
|
||||||
|
for _, ws := range w.resuming {
|
||||||
|
if ws.closing {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ws.donec = make(chan struct{})
|
ws.donec = make(chan struct{})
|
||||||
go w.serveSubstream(ws, w.resumec)
|
go w.serveSubstream(ws, w.resumec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, v3rpc.Error(err)
|
||||||
}
|
}
|
||||||
w.substreams = make(map[int64]*watcherStream)
|
|
||||||
// receive data from new grpc stream
|
// receive data from new grpc stream
|
||||||
go w.serveWatchClient(wc)
|
go w.serveWatchClient(wc)
|
||||||
return wc, nil
|
return wc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(w.resuming))
|
||||||
|
donec := make(chan struct{})
|
||||||
|
for i := range w.resuming {
|
||||||
|
go func(ws *watcherStream) {
|
||||||
|
defer wg.Done()
|
||||||
|
if ws.closing {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ws.initReq.ctx.Done():
|
||||||
|
// closed ws will be removed from resuming
|
||||||
|
ws.closing = true
|
||||||
|
close(ws.outc)
|
||||||
|
ws.outc = nil
|
||||||
|
go func() { w.closingc <- ws }()
|
||||||
|
case <-stopc:
|
||||||
|
}
|
||||||
|
}(w.resuming[i])
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
return donec
|
||||||
|
}
|
||||||
|
|
||||||
// joinSubstream waits for all substream goroutines to complete
|
// joinSubstream waits for all substream goroutines to complete
|
||||||
func (w *watchGrpcStream) joinSubstreams() {
|
func (w *watchGrpcStream) joinSubstreams() {
|
||||||
for _, ws := range w.substreams {
|
for _, ws := range w.substreams {
|
||||||
@ -652,9 +695,9 @@ func (w *watchGrpcStream) joinSubstreams() {
|
|||||||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.stopc:
|
case <-w.ctx.Done():
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil, context.Canceled
|
return nil, w.ctx.Err()
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
default:
|
default:
|
||||||
|
@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
|
|||||||
if len(resp.Kvs) != 1 {
|
if len(resp.Kvs) != 1 {
|
||||||
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
|
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
|
||||||
}
|
}
|
||||||
if resp.Kvs[0].CreateRevision != 4 {
|
if resp.Kvs[0].CreateRevision != 7 {
|
||||||
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
|
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +221,7 @@ OK
|
|||||||
|
|
||||||
### WATCH [options] [key or prefix] [range_end]
|
### WATCH [options] [key or prefix] [range_end]
|
||||||
|
|
||||||
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user.
|
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user. If range_end is given, it must be lexicographically greater than key or "\x00".
|
||||||
|
|
||||||
#### Options
|
#### Options
|
||||||
|
|
||||||
|
@ -27,11 +27,14 @@ import (
|
|||||||
"github.com/coreos/etcd/client"
|
"github.com/coreos/etcd/client"
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
|
"github.com/coreos/etcd/etcdserver/api"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/etcdserver/membership"
|
||||||
"github.com/coreos/etcd/mvcc"
|
"github.com/coreos/etcd/mvcc"
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
@ -42,6 +45,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
migrateExcludeTTLKey bool
|
||||||
migrateDatadir string
|
migrateDatadir string
|
||||||
migrateWALdir string
|
migrateWALdir string
|
||||||
migrateTransformer string
|
migrateTransformer string
|
||||||
@ -55,6 +59,7 @@ func NewMigrateCommand() *cobra.Command {
|
|||||||
Run: migrateCommandFunc,
|
Run: migrateCommandFunc,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
|
||||||
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
|
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
|
||||||
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
|
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
|
||||||
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
|
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
|
||||||
@ -74,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
writer, reader, errc = defaultTransformer()
|
writer, reader, errc = defaultTransformer()
|
||||||
}
|
}
|
||||||
|
|
||||||
st := rebuildStoreV2()
|
st, index := rebuildStoreV2()
|
||||||
be := prepareBackend()
|
be := prepareBackend()
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
maxIndexc := make(chan uint64, 1)
|
|
||||||
go func() {
|
go func() {
|
||||||
maxIndexc <- writeStore(writer, st)
|
writeStore(writer, st)
|
||||||
writer.Close()
|
writer.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
readKeys(reader, be)
|
readKeys(reader, be)
|
||||||
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
|
mvcc.UpdateConsistentIndex(be, index)
|
||||||
err := <-errc
|
err := <-errc
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("failed to transform keys")
|
fmt.Println("failed to transform keys")
|
||||||
@ -106,7 +110,10 @@ func prepareBackend() backend.Backend {
|
|||||||
return be
|
return be
|
||||||
}
|
}
|
||||||
|
|
||||||
func rebuildStoreV2() store.Store {
|
func rebuildStoreV2() (store.Store, uint64) {
|
||||||
|
var index uint64
|
||||||
|
cl := membership.NewCluster("")
|
||||||
|
|
||||||
waldir := migrateWALdir
|
waldir := migrateWALdir
|
||||||
if len(waldir) == 0 {
|
if len(waldir) == 0 {
|
||||||
waldir = path.Join(migrateDatadir, "member", "wal")
|
waldir = path.Join(migrateDatadir, "member", "wal")
|
||||||
@ -122,6 +129,7 @@ func rebuildStoreV2() store.Store {
|
|||||||
var walsnap walpb.Snapshot
|
var walsnap walpb.Snapshot
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
|
index = snapshot.Metadata.Index
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := wal.OpenForRead(waldir, walsnap)
|
w, err := wal.OpenForRead(waldir, walsnap)
|
||||||
@ -143,9 +151,15 @@ func rebuildStoreV2() store.Store {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
applier := etcdserver.NewApplierV2(st, nil)
|
cl.SetStore(st)
|
||||||
|
cl.Recover(api.UpdateCapability)
|
||||||
|
|
||||||
|
applier := etcdserver.NewApplierV2(st, cl)
|
||||||
for _, ent := range ents {
|
for _, ent := range ents {
|
||||||
if ent.Type != raftpb.EntryNormal {
|
if ent.Type == raftpb.EntryConfChange {
|
||||||
|
var cc raftpb.ConfChange
|
||||||
|
pbutil.MustUnmarshal(&cc, ent.Data)
|
||||||
|
applyConf(cc, cl)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,9 +174,34 @@ func rebuildStoreV2() store.Store {
|
|||||||
applyRequest(req, applier)
|
applyRequest(req, applier)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ent.Index > index {
|
||||||
|
index = ent.Index
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return st
|
return st, index
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
|
||||||
|
if err := cl.ValidateConfigurationChange(cc); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch cc.Type {
|
||||||
|
case raftpb.ConfChangeAddNode:
|
||||||
|
m := new(membership.Member)
|
||||||
|
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
cl.AddMember(m)
|
||||||
|
case raftpb.ConfChangeRemoveNode:
|
||||||
|
cl.RemoveMember(types.ID(cc.NodeID))
|
||||||
|
case raftpb.ConfChangeUpdateNode:
|
||||||
|
m := new(membership.Member)
|
||||||
|
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
|
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
|
||||||
@ -216,11 +255,13 @@ func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
|
|||||||
if n.Dir {
|
if n.Dir {
|
||||||
n.Nodes = nil
|
n.Nodes = nil
|
||||||
}
|
}
|
||||||
|
if !migrateExcludeTTLKey || n.TTL == 0 {
|
||||||
b, err := json.Marshal(n)
|
b, err := json.Marshal(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, err)
|
ExitWithError(ExitError, err)
|
||||||
}
|
}
|
||||||
fmt.Fprint(w, string(b))
|
fmt.Fprint(w, string(b))
|
||||||
|
}
|
||||||
for _, nn := range nodes {
|
for _, nn := range nodes {
|
||||||
max := writeKeys(w, nn)
|
max := writeKeys(w, nn)
|
||||||
if max > maxIndex {
|
if max > maxIndex {
|
||||||
|
@ -348,6 +348,51 @@ func TestV3WatchFutureRevision(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3WatchWrongRange tests wrong range does not create watchers.
|
||||||
|
func TestV3WatchWrongRange(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
wAPI := toGRPC(clus.RandClient()).Watch
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
wStream, err := wAPI.Watch(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("wAPI.Watch error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
key []byte
|
||||||
|
end []byte
|
||||||
|
canceled bool
|
||||||
|
}{
|
||||||
|
{[]byte("a"), []byte("a"), true}, // wrong range end
|
||||||
|
{[]byte("b"), []byte("a"), true}, // wrong range end
|
||||||
|
{[]byte("foo"), []byte{0}, false}, // watch request with 'WithFromKey'
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||||
|
CreateRequest: &pb.WatchCreateRequest{Key: tt.key, RangeEnd: tt.end, StartRevision: 1}}}); err != nil {
|
||||||
|
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||||
|
}
|
||||||
|
cresp, err := wStream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("#%d: wStream.Recv error: %v", i, err)
|
||||||
|
}
|
||||||
|
if !cresp.Created {
|
||||||
|
t.Fatalf("#%d: create %v, want %v", i, cresp.Created, true)
|
||||||
|
}
|
||||||
|
if cresp.Canceled != tt.canceled {
|
||||||
|
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
||||||
|
}
|
||||||
|
if tt.canceled && cresp.WatchId != -1 {
|
||||||
|
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
|
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
|
||||||
func TestV3WatchCancelSynced(t *testing.T) {
|
func TestV3WatchCancelSynced(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -96,6 +97,12 @@ type watchStream struct {
|
|||||||
// Watch creates a new watcher in the stream and returns its WatchID.
|
// Watch creates a new watcher in the stream and returns its WatchID.
|
||||||
// TODO: return error if ws is closed?
|
// TODO: return error if ws is closed?
|
||||||
func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
|
func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
|
||||||
|
// prevent wrong range where key >= end lexicographically
|
||||||
|
// watch request with 'WithFromKey' has empty-byte range end
|
||||||
|
if len(end) != 0 && bytes.Compare(key, end) != -1 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
ws.mu.Lock()
|
ws.mu.Lock()
|
||||||
defer ws.mu.Unlock()
|
defer ws.mu.Unlock()
|
||||||
if ws.closed {
|
if ws.closed {
|
||||||
|
@ -153,6 +153,28 @@ func TestWatcherWatchPrefix(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
|
||||||
|
// does not create watcher, which panics when canceling in range tree.
|
||||||
|
func TestWatcherWatchWrongRange(t *testing.T) {
|
||||||
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
|
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
|
||||||
|
defer cleanup(s, b, tmpPath)
|
||||||
|
|
||||||
|
w := s.NewWatchStream()
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 {
|
||||||
|
t.Fatalf("key == end range given; id expected -1, got %d", id)
|
||||||
|
}
|
||||||
|
if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 {
|
||||||
|
t.Fatalf("key > end range given; id expected -1, got %d", id)
|
||||||
|
}
|
||||||
|
// watch request with 'WithFromKey' has empty-byte range end
|
||||||
|
if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 {
|
||||||
|
t.Fatalf("\x00 is range given; id expected 0, got %d", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatchDeleteRange(t *testing.T) {
|
func TestWatchDeleteRange(t *testing.T) {
|
||||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.13"
|
Version = "3.0.15"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
Reference in New Issue
Block a user