Add filter validation to ensure watch only includes events within selector
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@ -73,6 +73,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -92,6 +95,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -115,6 +121,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -135,6 +144,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -159,6 +171,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "b",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -168,6 +183,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -186,6 +204,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -206,6 +227,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -230,6 +254,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -239,6 +266,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -257,6 +287,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -276,6 +309,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -285,6 +321,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -304,6 +343,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -325,6 +367,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -345,6 +390,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -370,7 +418,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -403,7 +450,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -430,7 +476,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Revision: 2,
|
Revision: 2,
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -458,7 +503,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -515,7 +559,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -550,7 +593,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -588,7 +630,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -687,7 +728,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -717,7 +757,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -801,7 +840,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -831,7 +869,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -859,7 +896,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
Revision: 3,
|
Revision: 3,
|
||||||
},
|
},
|
||||||
@ -944,7 +980,6 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
Key: "",
|
|
||||||
WithPrefix: true,
|
WithPrefix: true,
|
||||||
Revision: 3,
|
Revision: 3,
|
||||||
},
|
},
|
||||||
@ -1029,6 +1064,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -1056,6 +1094,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -1084,6 +1125,9 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
{
|
{
|
||||||
Events: []model.WatchEvent{
|
Events: []model.WatchEvent{
|
||||||
@ -1113,6 +1157,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1143,6 +1188,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1173,6 +1219,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1204,6 +1251,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1235,6 +1283,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
Key: "a",
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1266,6 +1315,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
Watch: []model.WatchOperation{
|
Watch: []model.WatchOperation{
|
||||||
{
|
{
|
||||||
Request: model.WatchRequest{
|
Request: model.WatchRequest{
|
||||||
|
WithPrefix: true,
|
||||||
WithPrevKV: true,
|
WithPrevKV: true,
|
||||||
},
|
},
|
||||||
Responses: []model.WatchResponse{
|
Responses: []model.WatchResponse{
|
||||||
@ -1317,8 +1367,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("b", "2", 3, true),
|
putPersistedEvent("b", "2", 3, true),
|
||||||
putPersistedEvent("a", "3", 4, false),
|
putPersistedEvent("a", "3", 4, false),
|
||||||
},
|
},
|
||||||
// TODO: Test should fail as there is an event not matching selector
|
expectError: errBrokeFilter.Error(),
|
||||||
expectError: "",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Filter - event not matching the watch with prefix - fail",
|
name: "Filter - event not matching the watch with prefix - fail",
|
||||||
@ -1348,8 +1397,7 @@ func TestValidateWatch(t *testing.T) {
|
|||||||
putPersistedEvent("bb", "2", 3, true),
|
putPersistedEvent("bb", "2", 3, true),
|
||||||
putPersistedEvent("ac", "3", 4, true),
|
putPersistedEvent("ac", "3", 4, true),
|
||||||
},
|
},
|
||||||
// TODO: Test should fail as there is an event not matching selector
|
expectError: errBrokeFilter.Error(),
|
||||||
expectError: "",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
|
@ -32,13 +32,18 @@ var (
|
|||||||
errBrokeResumable = errors.New("broke Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window")
|
errBrokeResumable = errors.New("broke Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window")
|
||||||
errBrokePrevKV = errors.New("incorrect event prevValue")
|
errBrokePrevKV = errors.New("incorrect event prevValue")
|
||||||
errBrokeIsCreate = errors.New("incorrect event IsCreate")
|
errBrokeIsCreate = errors.New("incorrect event IsCreate")
|
||||||
|
errBrokeFilter = errors.New("event not matching watch filter")
|
||||||
)
|
)
|
||||||
|
|
||||||
func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) error {
|
func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, eventHistory []model.PersistedEvent) error {
|
||||||
lg.Info("Validating watch")
|
lg.Info("Validating watch")
|
||||||
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
|
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
|
||||||
for _, r := range reports {
|
for _, r := range reports {
|
||||||
err := validateOrdered(lg, r)
|
err := validateFilter(lg, r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = validateOrdered(lg, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -76,6 +81,20 @@ func validateWatch(lg *zap.Logger, cfg Config, reports []report.ClientReport, ev
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateFilter(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||||
|
for _, watch := range report.Watch {
|
||||||
|
for _, resp := range watch.Responses {
|
||||||
|
for _, event := range resp.Events {
|
||||||
|
if !event.Match(watch.Request) {
|
||||||
|
lg.Error("event not matching event filter", zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("event", event))
|
||||||
|
err = errBrokeFilter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func validateBookmarkable(lg *zap.Logger, report report.ClientReport) (err error) {
|
func validateBookmarkable(lg *zap.Logger, report report.ClientReport) (err error) {
|
||||||
for _, op := range report.Watch {
|
for _, op := range report.Watch {
|
||||||
var lastProgressNotifyRevision int64
|
var lastProgressNotifyRevision int64
|
||||||
@ -163,7 +182,7 @@ func validateReliable(lg *zap.Logger, events []model.PersistedEvent, report repo
|
|||||||
}
|
}
|
||||||
for _, resp := range op.Responses {
|
for _, resp := range op.Responses {
|
||||||
for _, event := range resp.Events {
|
for _, event := range resp.Events {
|
||||||
if events[index].Match(op.Request) && events[index] != event.PersistedEvent {
|
if events[index].Match(op.Request) && (events[index].Event != event.PersistedEvent.Event || events[index].Revision != event.PersistedEvent.Revision) {
|
||||||
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.Any("missing-event", events[index]))
|
lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.Any("missing-event", events[index]))
|
||||||
err = errBrokeReliable
|
err = errBrokeReliable
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user