diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index b018a663f..9e3327258 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -42,12 +42,16 @@ func (w *watcher) send(wr clientv3.WatchResponse) { events := make([]*mvccpb.Event, 0, len(wr.Events)) + var lastRev int64 for i := range wr.Events { ev := (*mvccpb.Event)(wr.Events[i]) if ev.Kv.ModRevision <= w.rev { continue } else { - w.rev = ev.Kv.ModRevision + // We cannot update w.rev here. + // txn can have multiple events with the same rev. + // If we update w.rev here, we would skip some events in the same txn. + lastRev = ev.Kv.ModRevision } filtered := false @@ -65,6 +69,10 @@ func (w *watcher) send(wr clientv3.WatchResponse) { } } + if lastRev > w.rev { + w.rev = lastRev + } + // all events are filtered out? if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 { return diff --git a/proxy/grpcproxy/watcher_group.go b/proxy/grpcproxy/watcher_group.go index 00f1b9d8b..c7b7d9f60 100644 --- a/proxy/grpcproxy/watcher_group.go +++ b/proxy/grpcproxy/watcher_group.go @@ -67,10 +67,13 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) { } } -func (wg *watcherGroup) add(rid receiverID, w watcher) { +// add adds the watcher into the group with given ID. +// The current revision of the watcherGroup is returned. +func (wg *watcherGroup) add(rid receiverID, w watcher) int64 { wg.mu.Lock() defer wg.mu.Unlock() wg.receivers[rid] = w + return wg.rev } func (wg *watcherGroup) delete(rid receiverID) { diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index 2e040aded..9a03b7427 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -18,6 +18,8 @@ import ( "sync" "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" ) @@ -38,8 +40,21 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { groups := wgs.groups if wg, ok := groups[w.wr]; ok { - wg.add(rid, w) + rev := wg.add(rid, w) wgs.idToGroup[rid] = wg + + resp := &pb.WatchResponse{ + Header: &pb.ResponseHeader{ + // todo: fill in ClusterId + // todo: fill in MemberId: + Revision: rev, + // todo: fill in RaftTerm: + }, + WatchId: rid.watcherID, + Created: true, + } + w.ch <- resp + return }