Merge pull request #5850 from xiang90/get_o_kv

*: support get-old-kv in watch
This commit is contained in:
Xiang Li
2016-07-05 16:37:24 -07:00
committed by GitHub
12 changed files with 383 additions and 231 deletions

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,13 +82,18 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
// progress tracks the watchID that stream might need to send
// progress to.
// TOOD: combine progress and prevKV into a single struct?
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// mu protects progress
mu sync.Mutex
@ -101,14 +106,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
@ -181,8 +190,13 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 && creq.ProgressNotify {
sws.progress[id] = true
if id != -1 {
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
@ -207,6 +221,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
@ -253,8 +268,17 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
needPrevKV := sws.prevKV[wresp.WatchID]
for i := range evs {
events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
wr := &pb.WatchResponse{