Compare commits

..

1 Commits

Author SHA1 Message Date
abdf51e361 build(deps): bump go.opentelemetry.io/otel/sdk from 1.20.0 to 1.21.0
Bumps [go.opentelemetry.io/otel/sdk](https://github.com/open-telemetry/opentelemetry-go) from 1.20.0 to 1.21.0.
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.20.0...v1.21.0)

---
updated-dependencies:
- dependency-name: go.opentelemetry.io/otel/sdk
  dependency-type: indirect
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-20 17:37:29 +00:00
34 changed files with 1376 additions and 543 deletions

View File

@ -40,7 +40,7 @@ jobs:
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/init@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
with:
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
@ -50,6 +50,6 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/autobuild@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/analyze@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5

View File

@ -0,0 +1,21 @@
---
name: E2E Arm64 Nightly
permissions: read-all
on:
# schedules always run against the main branch, hence we have to create separate jobs
# with individual checkout actions for each of the active release branches
schedule:
- cron: '30 1 * * *' # runs daily at 1:30 am.
# workflow_dispatch enables manual testing of this job by maintainers
workflow_dispatch:
jobs:
main-arm64:
uses: ./.github/workflows/e2e-arm64-template.yaml
with:
etcdBranch: main
release-35-arm64:
uses: ./.github/workflows/e2e-arm64-template.yaml
with:
etcdBranch: release-3.5

View File

@ -1,6 +1,11 @@
---
name: E2E-Arm64
on: [push, pull_request]
name: Reusable Arm64 E2E Workflow
on:
workflow_call:
inputs:
etcdBranch:
required: true
type: string
permissions: read-all
jobs:
test:
@ -14,6 +19,8 @@ jobs:
- linux-arm64-e2e
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
ref: "${{ inputs.etcdBranch }}"
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0

View File

@ -18,7 +18,7 @@ jobs:
actions: write
steps:
- name: Update PR
uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
continue-on-error: true
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -50,6 +50,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
uses: github/codeql-action/upload-sarif@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
with:
sarif_file: results.sarif

View File

@ -2,27 +2,16 @@
Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/etcd/blob/main/CHANGELOG/CHANGELOG-3.3.md).
<hr>
## v3.4.28 (2023-11-23)
## v3.4.28 (tbd)
### etcd server
- Improve [Skip getting authInfo from incoming context when auth is disabled](https://github.com/etcd-io/etcd/pull/16240)
- Use [the default write scheduler](https://github.com/etcd-io/etcd/pull/16782) since golang.org/x/net@v0.11.0 started using round-robin scheduler.
- Add [cluster ID check during data corruption detection to prevent false alarm](https://github.com/etcd-io/etcd/issues/15548).
- Add [Learner support Snapshot RPC](https://github.com/etcd-io/etcd/pull/16990/).
### Package `clientv3`
- Fix [Reset auth token when failing to authenticate due to auth being disabled](https://github.com/etcd-io/etcd/pull/16240).
- [Simplify grpc dialer usage](https://github.com/etcd-io/etcd/issues/11519).
- [Replace balancer with upstream grpc solution](https://github.com/etcd-io/etcd/pull/16844).
- Fix [race condition when accessing cfg.Endpoints in dial()](https://github.com/etcd-io/etcd/pull/16857).
- Fix [invalid authority header issue in single endpoint scenario](https://github.com/etcd-io/etcd/pull/16988).
- Fix [Reset auth token when failing to authenticate due to auth being disabled](https://github.com/etcd-io/etcd/pull/16240)
### Dependencies
- Compile binaries using [go 1.20.11](https://github.com/etcd-io/etcd/pull/16916).
- Upgrade [bbolt to 1.3.8](https://github.com/etcd-io/etcd/pull/16834).
- Upgrade gRPC to 1.58.3 in https://github.com/etcd-io/etcd/pull/16997 and https://github.com/etcd-io/etcd/pull/16999. Note that gRPC server will reject requests with connection header (refer to https://github.com/grpc/grpc-go/pull/4803).
<hr>

View File

@ -5,10 +5,6 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
<hr>
## v3.5.11 (tbd)
### etcd server
- Fix distributed tracing by ensuring `--experimental-distributed-tracing-sampling-rate` configuration option is available to [set tracing sample rate](https://github.com/etcd-io/etcd/pull/16951).
### Dependencies
- Compile binaries using [go 1.20.11](https://github.com/etcd-io/etcd/pull/16915)
- Fix [CVE-2023-47108](https://github.com/advisories/GHSA-8pgv-569h-w5rw) by [bumping go.opentelemetry.io/otel to 1.20.0 and go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc to 0.46.0](https://github.com/etcd-io/etcd/pull/16946).

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !windows && !solaris && !wasm && !js
//go:build !windows && !solaris
package transport

View File

@ -1,30 +0,0 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build wasm || js
package transport
import (
"errors"
"syscall"
)
func setReusePort(network, address string, c syscall.RawConn) error {
return errors.New("port reuse is not supported on WASM")
}
func setReuseAddress(network, addr string, conn syscall.RawConn) error {
return errors.New("address reuse is not supported on WASM")
}

View File

@ -91,7 +91,7 @@ func New(cfg Config) (*Client, error) {
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context, opts ...Option) *Client {
cctx, cancel := context.WithCancel(ctx)
c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), epMu: new(sync.RWMutex)}
c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex)}
for _, opt := range opts {
opt(c)
}

View File

@ -29,7 +29,7 @@ func NewOrderViolationSwitchEndpointClosure(c *clientv3.Client) OrderViolationFu
violationCount := int32(0)
return func(_ clientv3.Op, _ clientv3.OpResponse, _ int64) error {
// Each request is assigned by round-robin load-balancer's picker to a different
// endpoint. If we cycled them 5 times (even with some level of concurrency),
// endpoints. If we cycled them 5 times (even with some level of concurrency),
// with high probability no endpoint points on a member with fresh data.
// TODO: Ideally we should track members (resp.opp.Header) that returned
// stale result and explicitly temporarily disable them in 'picker'.

View File

@ -57,10 +57,10 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
go.opentelemetry.io/otel v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
go.opentelemetry.io/otel/trace v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/net v0.18.0 // indirect

View File

@ -84,14 +84,14 @@ go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=

6
go.mod
View File

@ -77,10 +77,10 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/gofail v0.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect

12
go.sum
View File

@ -149,14 +149,14 @@ go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=

View File

@ -308,7 +308,6 @@ function go_test {
if [ "${VERBOSE:-}" == "1" ]; then
goTestFlags="-v "
goTestFlags+="-json "
fi
# Expanding patterns (like ./...) into list of packages

View File

@ -239,19 +239,14 @@ type CheckRegistry struct {
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
reg.Register("serializable_read", readCheck(server, true /* serializable */))
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
// serializable_read checks if local read is ok.
// linearizable_read checks if there is consensus in the cluster.
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
reg.Register("serializable_read", readCheck(server, true))
// linearizable_read check would be replaced by read_index check in 3.6
reg.Register("linearizable_read", readCheck(server, false))
reg.Register("serializable_read", serializableReadCheck(server))
reg.InstallHttpEndpoints(lg, mux)
}
@ -415,10 +410,13 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
}
}
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return func(ctx context.Context) error {
ctx = srv.AuthStore().WithRoot(ctx)
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
return err
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
if err != nil {
return fmt.Errorf("range error: %w", err)
}
return nil
}
}

View File

@ -39,17 +39,13 @@ import (
type fakeHealthServer struct {
fakeServer
serializableReadError error
linearizableReadError error
missingLeader bool
authStore auth.AuthStore
apiError error
missingLeader bool
authStore auth.AuthStore
}
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
if req.Serializable {
return nil, s.serializableReadError
}
return nil, s.linearizableReadError
func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, s.apiError
}
func (s *fakeHealthServer) Config() config.ServerConfig {
@ -152,11 +148,10 @@ func TestHealthHandler(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
fakeServer: fakeServer{alarms: tt.alarms},
serializableReadError: tt.apiError,
linearizableReadError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
fakeServer: fakeServer{alarms: tt.alarms},
apiError: tt.apiError,
missingLeader: tt.missingLeader,
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
})
ts := httptest.NewServer(mux)
defer ts.Close()
@ -192,8 +187,8 @@ func TestHttpSubPath(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
@ -276,14 +271,14 @@ func TestSerializableReadCheck(t *testing.T) {
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: Unexpected error"},
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[-]serializable_read failed: Unexpected error"},
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
},
}
for _, tt := range tests {
@ -291,8 +286,8 @@ func TestSerializableReadCheck(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
serializableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
apiError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
@ -303,47 +298,6 @@ func TestSerializableReadCheck(t *testing.T) {
}
}
func TestLinearizableReadCheck(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
tests := []healthTestCase{
{
name: "Alive normal",
healthCheckURL: "/livez?verbose",
expectStatusCode: http.StatusOK,
inResult: []string{"[+]serializable_read ok"},
},
{
name: "Alive if lineariable range api is not available",
healthCheckURL: "/livez",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusOK,
},
{
name: "Not ready if range api is not available",
healthCheckURL: "/readyz",
apiError: fmt.Errorf("Unexpected error"),
expectStatusCode: http.StatusServiceUnavailable,
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := http.NewServeMux()
logger := zaptest.NewLogger(t)
s := &fakeHealthServer{
linearizableReadError: tt.apiError,
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
}
HandleHealth(logger, mux, s)
ts := httptest.NewServer(mux)
defer ts.Close()
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
})
}
}
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})

View File

@ -303,16 +303,9 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange, shouldApplyV3 ShouldApplyV3) error {
var membersMap map[types.ID]*Member
var removedMap map[types.ID]bool
if shouldApplyV3 {
membersMap, removedMap = c.be.MustReadMembersFromBackend()
} else {
membersMap, removedMap = membersFromStore(c.lg, c.v2store)
}
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// TODO: this must be switched to backend as well.
membersMap, removedMap := membersFromStore(c.lg, c.v2store)
id := types.ID(cc.NodeID)
if removedMap[id] {
return ErrIDRemoved
@ -830,6 +823,23 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
return ids
}
// PushMembershipToStorage is overriding storage information about cluster's
// members, such that they fully reflect internal RaftCluster's storage.
func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
c.be.TrimMembershipFromBackend()
for _, m := range c.members {
c.be.MustSaveMemberToBackend(m)
}
}
if c.v2store != nil {
TrimMembershipFromV2Store(c.lg, c.v2store)
for _, m := range c.members {
mustSaveMemberToStore(c.lg, c.v2store, m)
}
}
}
// buildMembershipMetric sets the knownPeers metric based on the current
// members of the cluster.
func (c *RaftCluster) buildMembershipMetric() {

View File

@ -276,17 +276,8 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}
}
func TestClusterValidateConfigurationChangeV3(t *testing.T) {
testClusterValidateConfigurationChange(t, true)
}
func TestClusterValidateConfigurationChangeV2(t *testing.T) {
testClusterValidateConfigurationChange(t, false)
}
func testClusterValidateConfigurationChange(t *testing.T, shouldApplyV3 ShouldApplyV3) {
func TestClusterValidateConfigurationChange(t *testing.T) {
cl := NewCluster(zaptest.NewLogger(t), WithMaxLearners(1))
be := newMembershipBackend()
cl.SetBackend(be)
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
var isLearner bool
@ -464,7 +455,7 @@ func testClusterValidateConfigurationChange(t *testing.T, shouldApplyV3 ShouldAp
},
}
for i, tt := range tests {
err := cl.ValidateConfigurationChange(tt.cc, shouldApplyV3)
err := cl.ValidateConfigurationChange(tt.cc)
if err != tt.werr {
t.Errorf("#%d: validateConfigurationChange error = %v, want %v", i, err, tt.werr)
}

View File

@ -56,6 +56,27 @@ func IsMetaStoreOnly(store v2store.Store) (bool, error) {
return true, nil
}
// TrimMembershipFromV2Store removes all information about members &
// removed_members from the v2 store.
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
members, removed := membersFromStore(lg, s)
for mID := range members {
_, err := s.Delete(MemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
for mID := range removed {
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
return nil
}
func verifyNoMembersInStore(lg *zap.Logger, s v2store.Store) {
members, removed := membersFromStore(lg, s)
if len(members) != 0 || len(removed) != 0 {

View File

@ -16,34 +16,151 @@ package etcdserver
import (
"encoding/json"
"fmt"
"path"
"time"
"unicode/utf8"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.uber.org/zap"
)
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
const v2Version = "v2"
// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}
func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
if lg == nil {
lg = zap.NewNop()
}
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if s.cluster != nil {
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if s.cluster != nil {
// persist to backend given v2store can be very stale
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return &applierV2store{lg: lg, store: s, cluster: c}
}
type applierV2store struct {
lg *zap.Logger
store v2store.Store
cluster *membership.RaftCluster
}
func (a *applierV2store) Delete(r *RequestV2) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
}
}
func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
}
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
}
func (a *applierV2store) QGet(r *RequestV2) Response {
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
}
func (a *applierV2store) Sync(r *RequestV2) Response {
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
}
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer func(start time.Time) {
if !utf8.ValidString(r.Method) {
s.lg.Info("method is not valid utf-8")
return
}
success := resp.Err == nil
txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
}(time.Now())
switch r.Method {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
return s.applyV2.QGet(r)
case "SYNC":
return s.applyV2.Sync(r)
default:
// This should never be reached, but just in case:
return Response{Err: errors.ErrUnknownMethod}
}
}
func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
}
return ttlOptions
}
func toResponse(ev *v2store.Event, err error) Response {
return Response{Event: ev, Err: err}
}

View File

@ -240,9 +240,6 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper, time
cc := &http.Client{
Transport: rt,
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
var (
err error

View File

@ -140,6 +140,8 @@ type ServerV2 interface {
Server
Leader() types.ID
// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
ClientCertAuthEnabled() bool
}
@ -245,6 +247,8 @@ type EtcdServer struct {
v2store v2store.Store
snapshotter *snap.Snapshotter
applyV2 ApplierV2
uberApply apply.UberApplier
applyWait wait.WaitTime
@ -334,6 +338,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
@ -741,6 +746,21 @@ func (s *EtcdServer) run() {
// asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler(lg)
var (
smu sync.RWMutex
syncC <-chan time.Time
)
setSyncC := func(ch <-chan time.Time) {
smu.Lock()
syncC = ch
smu.Unlock()
}
getSyncC := func() (ch <-chan time.Time) {
smu.RLock()
ch = syncC
smu.RUnlock()
return
}
rh := &raftReadyHandler{
getLead: func() (lead uint64) { return s.getLead() },
updateLead: func(lead uint64) { s.setLead(lead) },
@ -752,6 +772,7 @@ func (s *EtcdServer) run() {
if s.compactor != nil {
s.compactor.Pause()
}
setSyncC(nil)
} else {
if newLeader {
t := time.Now()
@ -759,6 +780,7 @@ func (s *EtcdServer) run() {
s.leadElectedTime = t
s.leadTimeMu.Unlock()
}
setSyncC(s.SyncTicker.C)
if s.compactor != nil {
s.compactor.Resume()
}
@ -825,6 +847,10 @@ func (s *EtcdServer) run() {
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-getSyncC():
if s.v2store.HasTTLKeys() {
s.sync(s.Cfg.ReqTimeout())
}
case <-s.stop:
return
}
@ -1665,6 +1691,25 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
}
}
// sync proposes a SYNC request and is non-blocking.
// This makes no guarantee that the request will be proposed or performed.
// The request will be canceled after the given timeout.
func (s *EtcdServer) sync(timeout time.Duration) {
req := pb.Request{
Method: "SYNC",
ID: s.reqIDGen.Next(),
Time: time.Now().UnixNano(),
}
data := pbutil.MustMarshal(&req)
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
ctx, cancel := context.WithTimeout(s.ctx, timeout)
s.GoAttach(func() {
s.r.Propose(ctx, data)
cancel()
})
}
// publishV3 registers server information into the cluster using v3 request. The
// information is the JSON representation of this server's member struct, updated
// with the static clientURLs of the server.
@ -1771,30 +1816,30 @@ func (s *EtcdServer) apply(
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
e := es[i]
index := s.consistIndex.ConsistentIndex()
s.lg.Debug("Applying entry",
zap.Uint64("consistent-index", index),
zap.Uint64("entry-index", e.Index),
zap.Uint64("entry-term", e.Term),
zap.Stringer("entry-type", e.Type))
// We need to toApply all WAL entries on top of v2store
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
shouldApplyV3 := membership.ApplyV2storeOnly
if e.Index > index {
shouldApplyV3 = membership.ApplyBoth
// set the consistent index of current executing entry
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
}
zap.Uint64("index", e.Index),
zap.Uint64("term", e.Term),
zap.Stringer("type", e.Type))
switch e.Type {
case raftpb.EntryNormal:
// gofail: var beforeApplyOneEntryNormal struct{}
s.applyEntryNormal(&e, shouldApplyV3)
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
case raftpb.EntryConfChange:
// gofail: var beforeApplyOneConfChange struct{}
// We need to toApply all WAL entries on top of v2store
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
shouldApplyV3 := membership.ApplyV2storeOnly
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
@ -1816,9 +1861,14 @@ func (s *EtcdServer) apply(
}
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.ShouldApplyV3) {
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
var ar *apply.Result
if shouldApplyV3 {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
@ -1828,6 +1878,10 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
}
}()
}
s.lg.Debug("toApply entry normal",
zap.Uint64("consistent-index", index),
zap.Uint64("entry-index", e.Index),
zap.Bool("should-applyV3", bool(shouldApplyV3)))
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
@ -1848,16 +1902,14 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
s.w.Trigger(r.ID, Response{})
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.applyV2Request(req, shouldApplyV3)
s.w.Trigger(req.ID, Response{})
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
return
}
@ -1935,7 +1987,7 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
if err := s.cluster.ValidateConfigurationChange(cc, shouldApplyV3); err != nil {
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
@ -2215,6 +2267,46 @@ func (s *EtcdServer) monitorCompactHash() {
}
}
func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()
if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version using v2 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version using v2 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
}
req := pb.Request{
Method: "PUT",
Path: membership.StoreClusterVersionKey(),
Val: ver,
}
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return
case errors.ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return
default:
lg.Warn("failed to update cluster version", zap.Error(err))
}
}
func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg := s.Logger()

File diff suppressed because it is too large Load Diff

View File

@ -15,11 +15,152 @@
package etcdserver
import (
"context"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
)
type RequestV2 pb.Request
type RequestV2Handler interface {
Post(ctx context.Context, r *RequestV2) (Response, error)
Put(ctx context.Context, r *RequestV2) (Response, error)
Delete(ctx context.Context, r *RequestV2) (Response, error)
QGet(ctx context.Context, r *RequestV2) (Response, error)
Get(ctx context.Context, r *RequestV2) (Response, error)
Head(ctx context.Context, r *RequestV2) (Response, error)
}
type reqV2HandlerEtcdServer struct {
reqV2HandlerStore
s *EtcdServer
}
type reqV2HandlerStore struct {
store v2store.Store
applier ApplierV2
}
func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler {
return &reqV2HandlerStore{s, applier}
}
func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Post(r), nil
}
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Put(r, membership.ApplyBoth), nil
}
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Delete(r), nil
}
func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.QGet(r), nil
}
func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) {
if r.Wait {
wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
return Response{Watcher: wc}, err
}
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}
func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) {
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}
func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) {
data, err := ((*pb.Request)(r)).Marshal()
if err != nil {
return Response{}, err
}
ch := a.s.w.Register(r.ID)
start := time.Now()
a.s.r.Propose(ctx, data)
proposalsPending.Inc()
defer proposalsPending.Dec()
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.Err
case <-ctx.Done():
proposalsFailed.Inc()
a.s.w.Trigger(r.ID, nil) // GC wait
return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
case <-a.s.stopping:
}
return Response{}, errors.ErrStopped
}
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
r.ID = s.reqIDGen.Next()
h := &reqV2HandlerEtcdServer{
reqV2HandlerStore: reqV2HandlerStore{
store: s.v2store,
applier: s.applyV2,
},
s: s,
}
rp := &r
resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
resp.Term, resp.Index = s.Term(), s.CommittedIndex()
return resp, err
}
// Handle interprets r and performs an operation on s.store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) {
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"
}
switch r.Method {
case "POST":
return v2api.Post(ctx, r)
case "PUT":
return v2api.Put(ctx, r)
case "DELETE":
return v2api.Delete(ctx, r)
case "QGET":
return v2api.QGet(ctx, r)
case "GET":
return v2api.Get(ctx, r)
case "HEAD":
return v2api.Head(ctx, r)
}
return Response{}, errors.ErrUnknownMethod
}
func (r *RequestV2) String() string {
rpb := pb.Request(*r)
return rpb.String()

View File

@ -29,10 +29,10 @@ require (
go.etcd.io/etcd/client/v3 v3.6.0-alpha.0
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0
go.opentelemetry.io/otel/sdk v1.20.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.15.0
@ -60,9 +60,9 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
go.opentelemetry.io/otel/trace v1.20.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect

View File

@ -127,20 +127,20 @@ go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM=
go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0=
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=

View File

@ -291,21 +291,3 @@ func ctlV3MemberUpdate(cx ctlCtx, memberID, peerURL string) error {
cmdArgs := append(cx.PrefixArgs(), "member", "update", memberID, fmt.Sprintf("--peer-urls=%s", peerURL))
return e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, expect.ExpectedResponse{Value: " updated in cluster "})
}
func TestRemoveNonExistingMember(t *testing.T) {
e2e.BeforeTest(t)
ctx := context.Background()
cfg := e2e.ConfigStandalone(*e2e.NewConfig())
epc, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(cfg))
assert.NoError(t, err)
defer epc.Close()
c := epc.Etcdctl()
_, err = c.MemberRemove(ctx, 1)
assert.Error(t, err)
// Ensure that membership is properly bootstrapped.
err = epc.Restart(ctx)
assert.NoError(t, err)
}

View File

@ -226,54 +226,22 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
name: "overloaded server slow apply",
injectFailure: triggerSlowApply,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
// TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read.
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
},
},
healthChecks: defaultHealthCheckConfigs,
},
{
name: "network partitioned",
injectFailure: blackhole,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
expectedRespSubStrings: []string{
`[-]linearizable_read failed: etcdserver: leader changed`,
},
},
},
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
healthChecks: defaultHealthCheckConfigs,
},
{
name: "raft loop deadlock",
injectFailure: triggerRaftLoopDeadLock,
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
// TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented.
healthChecks: []healthCheckConfig{
{
url: "/livez",
expectedStatusCode: http.StatusOK,
},
{
url: "/readyz",
expectedTimeoutError: true,
expectedStatusCode: http.StatusServiceUnavailable,
},
},
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
healthChecks: defaultHealthCheckConfigs,
},
// verify that auth enabled serializable read must go through mvcc
{

View File

@ -38,82 +38,73 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func writeCustomV2Data(t testing.TB, epc *e2e.EtcdProcessCluster, count int) {
for i := 0; i < count; i++ {
func createV2store(t testing.TB, dataDirPath string) string {
t.Log("Creating not-yet v2-deprecated etcd")
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(e2e.LastVersion),
e2e.WithEnableV2(true),
e2e.WithDataDirPath(dataDirPath),
e2e.WithSnapshotCount(5),
))
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
memberDataDir := epc.Procs[0].Config().DataDirPath
defer func() {
assert.NoError(t, epc.Stop())
}()
// We need to exceed 'SnapshotCount' such that v2 snapshot is dumped.
for i := 0; i < 10; i++ {
if err := e2e.CURLPut(epc, e2e.CURLReq{
Endpoint: "/v2/keys/foo", Value: "bar" + fmt.Sprint(i),
Expected: expect.ExpectedResponse{Value: `{"action":"set","node":{"key":"/foo","value":"bar` + fmt.Sprint(i)}}); err != nil {
t.Fatalf("failed put with curl (%v)", err)
}
}
return memberDataDir
}
func TestV2DeprecationNotYet(t *testing.T) {
e2e.BeforeTest(t)
func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) {
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + dataDirPath}, nil)
assert.NoError(t, err)
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
assert.NoError(t, err)
}
func assertVerifyCannotStartV2deprecationNotYet(t testing.TB, dataDirPath string) {
t.Log("Verify its infeasible to start etcd with --v2-deprecation=not-yet mode")
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=not-yet"}, nil)
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=not-yet", "--data-dir=" + dataDirPath}, nil)
assert.NoError(t, err)
_, err = proc.Expect(`invalid value "not-yet" for flag -v2-deprecation: invalid value "not-yet"`)
assert.NoError(t, err)
}
func TestV2DeprecationWriteOnlyWAL(t *testing.T) {
func TestV2DeprecationFlags(t *testing.T) {
e2e.BeforeTest(t)
dataDirPath := t.TempDir()
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(e2e.LastVersion),
e2e.WithEnableV2(true),
e2e.WithDataDirPath(dataDirPath),
))
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
memberDataDir := epc.Procs[0].Config().DataDirPath
writeCustomV2Data(t, epc, 1)
var memberDataDir string
t.Run("create-storev2-data", func(t *testing.T) {
memberDataDir = createV2store(t, dataDirPath)
})
assert.NoError(t, epc.Stop())
t.Run("--v2-deprecation=not-yet fails", func(t *testing.T) {
assertVerifyCannotStartV2deprecationNotYet(t, memberDataDir)
})
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + memberDataDir}, nil)
assert.NoError(t, err)
t.Run("--v2-deprecation=write-only fails", func(t *testing.T) {
assertVerifyCannotStartV2deprecationWriteOnly(t, memberDataDir)
})
_, err = proc.Expect("detected disallowed v2 WAL for stage --v2-deprecation=write-only")
assert.NoError(t, err)
}
func TestV2DeprecationWriteOnlySnapshot(t *testing.T) {
e2e.BeforeTest(t)
dataDirPath := t.TempDir()
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
e2e.WithVersion(e2e.LastVersion),
e2e.WithEnableV2(true),
e2e.WithDataDirPath(dataDirPath),
e2e.WithSnapshotCount(10),
))
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
assert.NoError(t, err)
memberDataDir := epc.Procs[0].Config().DataDirPath
// We need to exceed 'SnapshotCount' such that v2 snapshot is dumped.
writeCustomV2Data(t, epc, 10)
assert.NoError(t, epc.Stop())
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + memberDataDir}, nil)
assert.NoError(t, err)
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
assert.NoError(t, err)
}
func TestV2DeprecationSnapshotMatches(t *testing.T) {

View File

@ -35,10 +35,10 @@ require (
go.etcd.io/etcd/server/v3 v3.6.0-alpha.0
go.etcd.io/gofail v0.1.0
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/sdk v1.20.0
go.opentelemetry.io/otel/trace v1.20.0
go.opentelemetry.io/proto/otlp v1.0.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.15.0
@ -84,9 +84,9 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect

View File

@ -153,20 +153,20 @@ go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM=
go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0=
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=

View File

@ -24,9 +24,6 @@ import (
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
)
// TestEndpointSwitchResolvesViolation ensures
// - ErrNoGreaterRev error is returned from partitioned member when it has stale revision
// - no more error after partition recovers
func TestEndpointSwitchResolvesViolation(t *testing.T) {
integration2.BeforeTest(t)
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
@ -81,16 +78,8 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
if err != ordering.ErrNoGreaterRev {
t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error")
}
clus.Members[2].RecoverPartition(t, clus.Members[:2]...)
time.Sleep(1 * time.Second) // give enough time for the operation
_, err = orderingKv.Get(ctx, "foo")
if err != nil {
t.Fatal("After partition recovered, third member should recover and return no error")
}
}
// TestUnresolvableOrderViolation ensures ErrNoGreaterRev error is returned when available members only have stale revisions
func TestUnresolvableOrderViolation(t *testing.T) {
integration2.BeforeTest(t)
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 5, UseBridge: true})