Merge pull request #14563 from kafuu-chino/3.5-backport-14296

*: avoid closing a watch with ID 0 incorrectly
This commit is contained in:
Hitoshi Mitake 2022-10-09 23:59:36 +09:00 committed by GitHub
commit 07c7a98371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 89 additions and 19 deletions

View File

@ -38,6 +38,13 @@ const (
EventTypePut = mvccpb.PUT
closeSendErrTimeout = 250 * time.Millisecond
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
AutoWatchID = 0
// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
InvalidWatchID = -1
)
type Event mvccpb.Event
@ -451,7 +458,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
@ -482,7 +489,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
} else if ws.outc != nil {
close(ws.outc)
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
delete(w.substreams, ws.id)
return
}
@ -544,7 +551,7 @@ func (w *watchGrpcStream) run() {
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
id: InvalidWatchID,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
@ -690,7 +697,7 @@ func (w *watchGrpcStream) run() {
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
cancelSet[ws.id] = struct{}{}
@ -742,9 +749,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
cancelReason: pbresp.CancelReason,
}
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
// indicate they should be broadcast.
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
return w.broadcastResponse(wr)
}
@ -899,7 +906,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
ws.id = InvalidWatchID
w.resuming = append(w.resuming, ws)
}
// strip out nils, if any

View File

@ -16,6 +16,7 @@ package v3rpc
import (
"context"
"fmt"
"io"
"math/rand"
"sync"
@ -24,6 +25,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/mvcc"
@ -285,7 +287,7 @@ func (sws *serverWatchStream) recvLoop() error {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId,
WatchId: clientv3.InvalidWatchID,
Canceled: true,
Created: true,
CancelReason: cancelReason,
@ -319,7 +321,10 @@ func (sws *serverWatchStream) recvLoop() error {
sws.fragment[id] = true
}
sws.mu.Unlock()
} else {
id = clientv3.InvalidWatchID
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
@ -356,7 +361,7 @@ func (sws *serverWatchStream) recvLoop() error {
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
}
}
default:
@ -479,7 +484,12 @@ func (sws *serverWatchStream) sendLoop() {
// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
if !(!(c.Canceled && c.Created) || wid == clientv3.InvalidWatchID) {
panic(fmt.Sprintf("unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchID))
}
if c.Canceled && wid != clientv3.InvalidWatchID {
delete(ids, wid)
continue
}

View File

@ -20,12 +20,9 @@ import (
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0
var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
@ -118,7 +115,7 @@ func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ..
return -1, ErrEmptyWatcherRange
}
if id == AutoWatchID {
if id == clientv3.AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}

View File

@ -238,7 +238,7 @@ func (wps *watchProxyStream) recvLoop() error {
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
wps.watchCh <- &pb.WatchResponse{
Header: &pb.ResponseHeader{},
WatchId: -1,
WatchId: clientv3.InvalidWatchID,
Created: true,
Canceled: true,
CancelReason: err.Error(),
@ -258,7 +258,7 @@ func (wps *watchProxyStream) recvLoop() error {
filters: v3rpc.FiltersFromRequest(cr),
}
if !w.wr.valid() {
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
wps.mu.Unlock()
continue
}

View File

@ -531,3 +531,58 @@ func TestV3AuthWatchAndTokenExpire(t *testing.T) {
watchResponse = <-wChan
testutil.AssertNil(t, watchResponse.Err())
}
func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
users := []user{
{
name: "user1",
password: "user1-123",
role: "role1",
key: "k1",
end: "k2",
},
}
authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users)
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
watchStartCh, watchEndCh := make(chan interface{}), make(chan interface{})
go func() {
wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
watchStartCh <- struct{}{}
watchResponse := <-wChan
t.Logf("watch response from k1: %v", watchResponse)
testutil.AssertTrue(t, len(watchResponse.Events) != 0)
watchEndCh <- struct{}{}
}()
// Chan for making sure that the above goroutine invokes Watch()
// So the above Watch() can get watch ID = 0
<-watchStartCh
wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
watchResponse := <-wChan
testutil.AssertNotNil(t, watchResponse.Err()) // permission denied
_, err := c.Put(ctx, "k1", "val")
if err != nil {
t.Fatalf("Unexpected error from Put: %v", err)
}
<-watchEndCh
}

View File

@ -26,6 +26,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
)
@ -395,8 +396,8 @@ func TestV3WatchWrongRange(t *testing.T) {
if cresp.Canceled != tt.canceled {
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
}
if tt.canceled && cresp.WatchId != -1 {
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
if tt.canceled && cresp.WatchId != clientv3.InvalidWatchID {
t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, clientv3.InvalidWatchID)
}
}
}