From 26cd2bc01770b96ca307b302d948c7dfcc659a49 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sat, 24 Jun 2023 11:59:30 +0200 Subject: [PATCH] tests/robustness: Store whole watch operations Want to keep watch requests to properly validate reliability of watch stream. Signed-off-by: Marek Siarkowicz --- tests/robustness/linearizability_test.go | 2 +- tests/robustness/report.go | 12 +- tests/robustness/traffic/client.go | 130 +++++++++++------- tests/robustness/traffic/traffic.go | 2 +- tests/robustness/validate/patch_history.go | 10 +- .../robustness/validate/patch_history_test.go | 6 +- tests/robustness/validate/watch.go | 98 +++++++------ tests/robustness/watch.go | 18 +-- 8 files changed, 161 insertions(+), 117 deletions(-) diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index d71433da9..12174a7de 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -204,7 +204,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu func operationsMaxRevision(reports []traffic.ClientReport) int64 { var maxRevision int64 for _, r := range reports { - revision := r.OperationHistory.MaxRevision() + revision := r.KeyValue.MaxRevision() if revision > maxRevision { maxRevision = revision } diff --git a/tests/robustness/report.go b/tests/robustness/report.go index 8250cb4ba..904e7cd32 100644 --- a/tests/robustness/report.go +++ b/tests/robustness/report.go @@ -79,9 +79,9 @@ func (r *report) Report(t *testing.T, force bool) { t.Fatal(err) } if len(report.Watch) != 0 { - persistWatchResponses(t, r.lg, filepath.Join(clientDir, "watch.json"), report.Watch) + persistWatchOperations(t, r.lg, filepath.Join(clientDir, "watch.json"), report.Watch) } - operations := report.OperationHistory.Operations() + operations := report.KeyValue.Operations() if len(operations) != 0 { persistOperationHistory(t, r.lg, filepath.Join(clientDir, "operations.json"), operations) } @@ -104,11 +104,11 @@ func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess, } } -func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchResponse) { - lg.Info("Saving watch responses", zap.String("path", path)) +func persistWatchOperations(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchOperation) { + lg.Info("Saving watch operations", zap.String("path", path)) file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { - t.Errorf("Failed to save watch history: %v", err) + t.Errorf("Failed to save watch operations: %v", err) return } defer file.Close() @@ -116,7 +116,7 @@ func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses for _, resp := range responses { err := encoder.Encode(resp) if err != nil { - t.Errorf("Failed to encode response: %v", err) + t.Errorf("Failed to encode operation: %v", err) } } } diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 7c9b3e9c4..15bdea83b 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -38,11 +38,23 @@ type RecordingClient struct { // see https://github.com/golang/go/blob/master/src/time/time.go#L17 baseTime time.Time - watchMux sync.Mutex - watchResponses []WatchResponse + watchMux sync.Mutex + watchOperations []WatchOperation // mux ensures order of request appending. - opMux sync.Mutex - operations *model.AppendableHistory + kvMux sync.Mutex + kvOperations *model.AppendableHistory +} + +type WatchOperation struct { + Request WatchRequest + Responses []WatchResponse +} + +type WatchRequest struct { + Key string + Revision int64 + WithPrefix bool + WithProgressNotify bool } type WatchResponse struct { @@ -68,10 +80,10 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (* return nil, err } return &RecordingClient{ - id: ids.NewClientId(), - client: *cc, - operations: model.NewAppendableHistory(ids), - baseTime: baseTime, + id: ids.NewClientId(), + client: *cc, + kvOperations: model.NewAppendableHistory(ids), + baseTime: baseTime, }, nil } @@ -81,22 +93,24 @@ func (c *RecordingClient) Close() error { func (c *RecordingClient) Report() ClientReport { return ClientReport{ - ClientId: c.id, - OperationHistory: c.operations.History, - Watch: c.watchResponses, + ClientId: c.id, + KeyValue: c.kvOperations.History, + Watch: c.watchOperations, } } type ClientReport struct { - ClientId int - OperationHistory model.History - Watch []WatchResponse + ClientId int + KeyValue model.History + Watch []WatchOperation } func (r ClientReport) WatchEventCount() int { count := 0 - for _, resp := range r.Watch { - count += len(resp.Events) + for _, op := range r.Watch { + for _, resp := range op.Responses { + count += len(resp.Events) + } } return count } @@ -123,35 +137,35 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision if limit != 0 { ops = append(ops, clientv3.WithLimit(limit)) } - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Get(ctx, start, ops...) if err != nil { return nil, err } returnTime := time.Since(c.baseTime) - c.operations.AppendRange(start, end, revision, limit, callTime, returnTime, resp) + c.kvOperations.AppendRange(start, end, revision, limit, callTime, returnTime, resp) return resp, nil } func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) { - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value) returnTime := time.Since(c.baseTime) - c.operations.AppendPut(key, value, callTime, returnTime, resp, err) + c.kvOperations.AppendPut(key, value, callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) { - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Delete(ctx, key) returnTime := time.Since(c.baseTime) - c.operations.AppendDelete(key, callTime, returnTime, resp, err) + c.kvOperations.AppendDelete(key, callTime, returnTime, resp, err) return resp, err } @@ -163,74 +177,92 @@ func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, on ).Else( onFailure..., ) - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := txn.Commit() returnTime := time.Since(c.baseTime) - c.operations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err) + c.kvOperations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Grant(ctx, ttl) returnTime := time.Since(c.baseTime) - c.operations.AppendLeaseGrant(callTime, returnTime, resp, err) + c.kvOperations.AppendLeaseGrant(callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) (*clientv3.LeaseRevokeResponse, error) { - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId)) returnTime := time.Since(c.baseTime) - c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) + c.kvOperations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) (*clientv3.PutResponse, error) { opts := clientv3.WithLease(clientv3.LeaseID(leaseId)) - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Put(ctx, key, value, opts) returnTime := time.Since(c.baseTime) - c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) + c.kvOperations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) { - c.opMux.Lock() - defer c.opMux.Unlock() + c.kvMux.Lock() + defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0]) returnTime := time.Since(c.baseTime) - c.operations.AppendDefragment(callTime, returnTime, resp, err) + c.kvOperations.AppendDefragment(callTime, returnTime, resp, err) return resp, err } func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan { - ops := []clientv3.OpOption{clientv3.WithProgressNotify()} - if withPrefix { + request := WatchRequest{ + Key: key, + Revision: rev, + WithPrefix: withPrefix, + WithProgressNotify: withProgressNotify, + } + return c.watch(ctx, request) + +} + +func (c *RecordingClient) watch(ctx context.Context, request WatchRequest) clientv3.WatchChan { + ops := []clientv3.OpOption{} + if request.WithPrefix { ops = append(ops, clientv3.WithPrefix()) } - if rev != 0 { - ops = append(ops, clientv3.WithRev(rev)) + if request.Revision != 0 { + ops = append(ops, clientv3.WithRev(request.Revision)) } - if withProgressNotify { + if request.WithProgressNotify { ops = append(ops, clientv3.WithProgressNotify()) } respCh := make(chan clientv3.WatchResponse) + + c.watchMux.Lock() + c.watchOperations = append(c.watchOperations, WatchOperation{ + Request: request, + Responses: []WatchResponse{}, + }) + index := len(c.watchOperations) - 1 + c.watchMux.Unlock() + go func() { defer close(respCh) - for r := range c.client.Watch(ctx, key, ops...) { - c.watchMux.Lock() - c.watchResponses = append(c.watchResponses, ToWatchResponse(r, c.baseTime)) - c.watchMux.Unlock() + for r := range c.client.Watch(ctx, request.Key, ops...) { + c.watchOperations[index].Responses = append(c.watchOperations[index].Responses, ToWatchResponse(r, c.baseTime)) select { case respCh <- r: case <-ctx.Done(): diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index f2f6cbdeb..d760ed22c 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -78,7 +78,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 var operationCount int for _, r := range reports { - operationCount += r.OperationHistory.Len() + operationCount += r.KeyValue.Len() } lg.Info("Recorded operations", zap.Int("operationCount", operationCount)) diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 0d3557867..3d302e8af 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -30,7 +30,7 @@ func patchedOperationHistory(reports []traffic.ClientReport) []porcupine.Operati func operations(reports []traffic.ClientReport) []porcupine.Operation { var ops []porcupine.Operation for _, r := range reports { - ops = append(ops, r.OperationHistory.Operations()...) + ops = append(ops, r.KeyValue.Operations()...) } return ops } @@ -38,9 +38,11 @@ func operations(reports []traffic.ClientReport) []porcupine.Operation { func uniqueWatchEvents(reports []traffic.ClientReport) map[model.Event]traffic.TimedWatchEvent { persisted := map[model.Event]traffic.TimedWatchEvent{} for _, r := range reports { - for _, resp := range r.Watch { - for _, event := range resp.Events { - persisted[event.Event] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event} + for _, op := range r.Watch { + for _, resp := range op.Responses { + for _, event := range resp.Events { + persisted[event.Event] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event} + } } } } diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index 333235af0..b3c0fe338 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -333,9 +333,9 @@ func TestPatchHistory(t *testing.T) { } operations := patchedOperationHistory([]traffic.ClientReport{ { - ClientId: 0, - OperationHistory: history.History, - Watch: watch, + ClientId: 0, + KeyValue: history.History, + Watch: []traffic.WatchOperation{{Responses: watch}}, }, }) remains := len(operations) == history.Len() diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index 3e3e3c2cf..ab991e12c 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -41,26 +41,30 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []m func validateBookmarkable(t *testing.T, report traffic.ClientReport) { var lastProgressNotifyRevision int64 = 0 - for _, resp := range report.Watch { - for _, event := range resp.Events { - if event.Revision <= lastProgressNotifyRevision { - t.Errorf("Broke watch guarantee: Bookmarkable - Progress notification events guarantee that all events up to a revision have been already delivered, eventRevision: %d, progressNotifyRevision: %d", event.Revision, lastProgressNotifyRevision) + for _, op := range report.Watch { + for _, resp := range op.Responses { + for _, event := range resp.Events { + if event.Revision <= lastProgressNotifyRevision { + t.Errorf("Broke watch guarantee: Bookmarkable - Progress notification events guarantee that all events up to a revision have been already delivered, eventRevision: %d, progressNotifyRevision: %d", event.Revision, lastProgressNotifyRevision) + } + } + if resp.IsProgressNotify { + lastProgressNotifyRevision = resp.Revision } - } - if resp.IsProgressNotify { - lastProgressNotifyRevision = resp.Revision } } } func validateOrdered(t *testing.T, report traffic.ClientReport) { var lastEventRevision int64 = 1 - for _, resp := range report.Watch { - for _, event := range resp.Events { - if event.Revision < lastEventRevision { - t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId) + for _, op := range report.Watch { + for _, resp := range op.Responses { + for _, event := range resp.Events { + if event.Revision < lastEventRevision { + t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId) + } + lastEventRevision = event.Revision } - lastEventRevision = event.Revision } } } @@ -68,33 +72,37 @@ func validateOrdered(t *testing.T, report traffic.ClientReport) { func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.ClientReport) { uniqueOperations := map[interface{}]struct{}{} - for _, resp := range report.Watch { - for _, event := range resp.Events { - var key interface{} - if expectUniqueRevision { - key = event.Revision - } else { - key = struct { - revision int64 - key string - }{event.Revision, event.Key} + for _, op := range report.Watch { + for _, resp := range op.Responses { + for _, event := range resp.Events { + var key interface{} + if expectUniqueRevision { + key = event.Revision + } else { + key = struct { + revision int64 + key string + }{event.Revision, event.Key} + } + if _, found := uniqueOperations[key]; found { + t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Key, event.Revision, report.ClientId) + } + uniqueOperations[key] = struct{}{} } - if _, found := uniqueOperations[key]; found { - t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Key, event.Revision, report.ClientId) - } - uniqueOperations[key] = struct{}{} } } } func validateAtomic(t *testing.T, report traffic.ClientReport) { var lastEventRevision int64 = 1 - for _, resp := range report.Watch { - if len(resp.Events) > 0 { - if resp.Events[0].Revision == lastEventRevision { - t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId) + for _, op := range report.Watch { + for _, resp := range op.Responses { + if len(resp.Events) > 0 { + if resp.Events[0].Revision == lastEventRevision { + t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId) + } + lastEventRevision = resp.Events[len(resp.Events)-1].Revision } - lastEventRevision = resp.Events[len(resp.Events)-1].Revision } } } @@ -120,21 +128,23 @@ func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []mode var lastRevision int64 = 0 events := []model.WatchEvent{} for _, r := range reports { - for _, resp := range r.Watch { - for _, event := range resp.Events { - if event.Revision == lastRevision && lastClientId == r.ClientId { - events = append(events, event) - } else { - if prev, found := revisionToEvents[lastRevision]; found { - if diff := cmp.Diff(prev.events, events); diff != "" { - t.Errorf("Events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff) - } + for _, op := range r.Watch { + for _, resp := range op.Responses { + for _, event := range resp.Events { + if event.Revision == lastRevision && lastClientId == r.ClientId { + events = append(events, event) } else { - revisionToEvents[lastRevision] = revisionEvents{clientId: lastClientId, events: events, revision: lastRevision} + if prev, found := revisionToEvents[lastRevision]; found { + if diff := cmp.Diff(prev.events, events); diff != "" { + t.Errorf("Events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff) + } + } else { + revisionToEvents[lastRevision] = revisionEvents{clientId: lastClientId, events: events, revision: lastRevision} + } + lastClientId = r.ClientId + lastRevision = event.Revision + events = []model.WatchEvent{event} } - lastClientId = r.ClientId - lastRevision = event.Revision - events = []model.WatchEvent{event} } } } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index a08ee6a59..7873e2b18 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -111,17 +111,17 @@ func watchUntilRevision(ctx context.Context, t *testing.T, c *traffic.RecordingC func validateGotAtLeastOneProgressNotify(t *testing.T, reports []traffic.ClientReport, expectProgressNotify bool) { var gotProgressNotify = false +external: for _, r := range reports { - var lastHeadRevision int64 = 1 - for _, resp := range r.Watch { - if resp.IsProgressNotify && resp.Revision == lastHeadRevision { - gotProgressNotify = true - break + for _, op := range r.Watch { + var lastHeadRevision int64 = 1 + for _, resp := range op.Responses { + if resp.IsProgressNotify && resp.Revision == lastHeadRevision { + gotProgressNotify = true + break external + } + lastHeadRevision = resp.Revision } - lastHeadRevision = resp.Revision - } - if gotProgressNotify { - break } } if gotProgressNotify != expectProgressNotify {