Compare commits
57 Commits
dependabot
...
dependabot
Author | SHA1 | Date | |
---|---|---|---|
b687d76abd | |||
62b772c321 | |||
e0b18a5ade | |||
fbdf65f101 | |||
e1d79097d5 | |||
054b67780b | |||
c3c029d057 | |||
e192a05193 | |||
7fb6b756a5 | |||
ce0a737ab1 | |||
a97052acf4 | |||
e6595825ae | |||
52a9b9d96c | |||
4fe46f9203 | |||
a968c1f5b3 | |||
6db5e00103 | |||
c975f24202 | |||
7fdb33065d | |||
82fe457b29 | |||
904c0769e9 | |||
b3d351185c | |||
5426f6d264 | |||
093666f450 | |||
b17c1de30c | |||
c72ff1e69c | |||
f04478f41e | |||
ed3375e076 | |||
ccd711ae13 | |||
f454342bfe | |||
9d97cc203e | |||
28d9564962 | |||
2f30760b37 | |||
21311698aa | |||
29dd025b84 | |||
cac2882df8 | |||
fd0882b67e | |||
12b640523a | |||
16e441671a | |||
f64689f4e2 | |||
5a99305004 | |||
ec6147cd04 | |||
a4426b4d74 | |||
bccd56ae48 | |||
7c0a09b81e | |||
3a9a7483b7 | |||
3b37afec7b | |||
08aabfea02 | |||
5449f0fb99 | |||
3836324e8c | |||
c041272d26 | |||
dd7a4d28a8 | |||
b4fd31f254 | |||
8b3efbc597 | |||
51eb29af36 | |||
089165deec | |||
c48738e33f | |||
1cecf35b24 |
6
.github/workflows/codeql-analysis.yml
vendored
6
.github/workflows/codeql-analysis.yml
vendored
@ -40,7 +40,7 @@ jobs:
|
||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
|
||||
uses: github/codeql-action/init@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
|
||||
with:
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
@ -50,6 +50,6 @@ jobs:
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
uses: github/codeql-action/autobuild@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
|
||||
uses: github/codeql-action/autobuild@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
|
||||
uses: github/codeql-action/analyze@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
|
||||
|
21
.github/workflows/e2e-arm64-nightly.yaml
vendored
21
.github/workflows/e2e-arm64-nightly.yaml
vendored
@ -1,21 +0,0 @@
|
||||
---
|
||||
name: E2E Arm64 Nightly
|
||||
permissions: read-all
|
||||
on:
|
||||
# schedules always run against the main branch, hence we have to create separate jobs
|
||||
# with individual checkout actions for each of the active release branches
|
||||
schedule:
|
||||
- cron: '30 1 * * *' # runs daily at 1:30 am.
|
||||
|
||||
# workflow_dispatch enables manual testing of this job by maintainers
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
main-arm64:
|
||||
uses: ./.github/workflows/e2e-arm64-template.yaml
|
||||
with:
|
||||
etcdBranch: main
|
||||
release-35-arm64:
|
||||
uses: ./.github/workflows/e2e-arm64-template.yaml
|
||||
with:
|
||||
etcdBranch: release-3.5
|
@ -1,11 +1,6 @@
|
||||
---
|
||||
name: Reusable Arm64 E2E Workflow
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
etcdBranch:
|
||||
required: true
|
||||
type: string
|
||||
name: E2E-Arm64
|
||||
on: [push, pull_request]
|
||||
permissions: read-all
|
||||
jobs:
|
||||
test:
|
||||
@ -19,8 +14,6 @@ jobs:
|
||||
- linux-arm64-e2e
|
||||
steps:
|
||||
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
with:
|
||||
ref: "${{ inputs.etcdBranch }}"
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0
|
2
.github/workflows/gh-workflow-approve.yaml
vendored
2
.github/workflows/gh-workflow-approve.yaml
vendored
@ -18,7 +18,7 @@ jobs:
|
||||
actions: write
|
||||
steps:
|
||||
- name: Update PR
|
||||
uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
|
||||
uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
|
||||
continue-on-error: true
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
2
.github/workflows/scorecards.yml
vendored
2
.github/workflows/scorecards.yml
vendored
@ -50,6 +50,6 @@ jobs:
|
||||
|
||||
# Upload the results to GitHub's code scanning dashboard.
|
||||
- name: "Upload to code-scanning"
|
||||
uses: github/codeql-action/upload-sarif@74483a38d39275f33fcff5f35b679b5ca4a26a99 # v2.22.5
|
||||
uses: github/codeql-action/upload-sarif@407ffafae6a767df3e0230c3df91b6443ae8df75 # v2.22.8
|
||||
with:
|
||||
sarif_file: results.sarif
|
||||
|
@ -2,16 +2,27 @@
|
||||
|
||||
Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/etcd/blob/main/CHANGELOG/CHANGELOG-3.3.md).
|
||||
|
||||
## v3.4.28 (tbd)
|
||||
<hr>
|
||||
|
||||
## v3.4.28 (2023-11-23)
|
||||
|
||||
### etcd server
|
||||
- Improve [Skip getting authInfo from incoming context when auth is disabled](https://github.com/etcd-io/etcd/pull/16240)
|
||||
- Use [the default write scheduler](https://github.com/etcd-io/etcd/pull/16782) since golang.org/x/net@v0.11.0 started using round-robin scheduler.
|
||||
- Add [cluster ID check during data corruption detection to prevent false alarm](https://github.com/etcd-io/etcd/issues/15548).
|
||||
- Add [Learner support Snapshot RPC](https://github.com/etcd-io/etcd/pull/16990/).
|
||||
|
||||
### Package `clientv3`
|
||||
- Fix [Reset auth token when failing to authenticate due to auth being disabled](https://github.com/etcd-io/etcd/pull/16240)
|
||||
- Fix [Reset auth token when failing to authenticate due to auth being disabled](https://github.com/etcd-io/etcd/pull/16240).
|
||||
- [Simplify grpc dialer usage](https://github.com/etcd-io/etcd/issues/11519).
|
||||
- [Replace balancer with upstream grpc solution](https://github.com/etcd-io/etcd/pull/16844).
|
||||
- Fix [race condition when accessing cfg.Endpoints in dial()](https://github.com/etcd-io/etcd/pull/16857).
|
||||
- Fix [invalid authority header issue in single endpoint scenario](https://github.com/etcd-io/etcd/pull/16988).
|
||||
|
||||
### Dependencies
|
||||
- Compile binaries using [go 1.20.11](https://github.com/etcd-io/etcd/pull/16916).
|
||||
- Upgrade [bbolt to 1.3.8](https://github.com/etcd-io/etcd/pull/16834).
|
||||
- Upgrade gRPC to 1.58.3 in https://github.com/etcd-io/etcd/pull/16997 and https://github.com/etcd-io/etcd/pull/16999. Note that gRPC server will reject requests with connection header (refer to https://github.com/grpc/grpc-go/pull/4803).
|
||||
|
||||
<hr>
|
||||
|
||||
|
@ -5,6 +5,10 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
|
||||
<hr>
|
||||
|
||||
## v3.5.11 (tbd)
|
||||
|
||||
### etcd server
|
||||
- Fix distributed tracing by ensuring `--experimental-distributed-tracing-sampling-rate` configuration option is available to [set tracing sample rate](https://github.com/etcd-io/etcd/pull/16951).
|
||||
|
||||
### Dependencies
|
||||
- Compile binaries using [go 1.20.11](https://github.com/etcd-io/etcd/pull/16915)
|
||||
- Fix [CVE-2023-47108](https://github.com/advisories/GHSA-8pgv-569h-w5rw) by [bumping go.opentelemetry.io/otel to 1.20.0 and go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc to 0.46.0](https://github.com/etcd-io/etcd/pull/16946).
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !windows && !solaris
|
||||
//go:build !windows && !solaris && !wasm && !js
|
||||
|
||||
package transport
|
||||
|
||||
|
30
client/pkg/transport/sockopt_wasm.go
Normal file
30
client/pkg/transport/sockopt_wasm.go
Normal file
@ -0,0 +1,30 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm || js
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func setReusePort(network, address string, c syscall.RawConn) error {
|
||||
return errors.New("port reuse is not supported on WASM")
|
||||
}
|
||||
|
||||
func setReuseAddress(network, addr string, conn syscall.RawConn) error {
|
||||
return errors.New("address reuse is not supported on WASM")
|
||||
}
|
@ -91,7 +91,7 @@ func New(cfg Config) (*Client, error) {
|
||||
// service interface implementations and do not need connection management.
|
||||
func NewCtxClient(ctx context.Context, opts ...Option) *Client {
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex)}
|
||||
c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), epMu: new(sync.RWMutex)}
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func NewOrderViolationSwitchEndpointClosure(c *clientv3.Client) OrderViolationFu
|
||||
violationCount := int32(0)
|
||||
return func(_ clientv3.Op, _ clientv3.OpResponse, _ int64) error {
|
||||
// Each request is assigned by round-robin load-balancer's picker to a different
|
||||
// endpoints. If we cycled them 5 times (even with some level of concurrency),
|
||||
// endpoint. If we cycled them 5 times (even with some level of concurrency),
|
||||
// with high probability no endpoint points on a member with fresh data.
|
||||
// TODO: Ideally we should track members (resp.opp.Header) that returned
|
||||
// stale result and explicitly temporarily disable them in 'picker'.
|
||||
|
@ -57,10 +57,10 @@ require (
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||
go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
|
||||
go.opentelemetry.io/otel v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.20.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
|
||||
go.opentelemetry.io/otel v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.21.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.15.0 // indirect
|
||||
golang.org/x/net v0.18.0 // indirect
|
||||
|
@ -84,14 +84,14 @@ go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
|
||||
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
|
||||
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
|
||||
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
|
||||
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
|
||||
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
|
||||
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
|
||||
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
|
14
go.mod
14
go.mod
@ -77,13 +77,13 @@ require (
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||
go.etcd.io/gofail v0.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
|
||||
go.opentelemetry.io/otel v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.20.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
|
||||
go.opentelemetry.io/otel v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.21.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.15.0 // indirect
|
||||
|
28
go.sum
28
go.sum
@ -149,20 +149,20 @@ go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
|
||||
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
|
||||
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
|
||||
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
|
||||
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
|
||||
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0=
|
||||
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
|
||||
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
|
@ -308,6 +308,7 @@ function go_test {
|
||||
|
||||
if [ "${VERBOSE:-}" == "1" ]; then
|
||||
goTestFlags="-v "
|
||||
goTestFlags+="-json "
|
||||
fi
|
||||
|
||||
# Expanding patterns (like ./...) into list of packages
|
||||
|
@ -239,14 +239,19 @@ type CheckRegistry struct {
|
||||
|
||||
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("serializable_read", serializableReadCheck(server))
|
||||
reg.Register("serializable_read", readCheck(server, true /* serializable */))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT))
|
||||
reg.Register("serializable_read", serializableReadCheck(server))
|
||||
// serializable_read checks if local read is ok.
|
||||
// linearizable_read checks if there is consensus in the cluster.
|
||||
// Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure.
|
||||
reg.Register("serializable_read", readCheck(server, true))
|
||||
// linearizable_read check would be replaced by read_index check in 3.6
|
||||
reg.Register("linearizable_read", readCheck(server, false))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
@ -410,13 +415,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e
|
||||
}
|
||||
}
|
||||
|
||||
func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
|
||||
func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
ctx = srv.AuthStore().WithRoot(ctx)
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true})
|
||||
if err != nil {
|
||||
return fmt.Errorf("range error: %w", err)
|
||||
}
|
||||
return nil
|
||||
_, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -39,13 +39,17 @@ import (
|
||||
|
||||
type fakeHealthServer struct {
|
||||
fakeServer
|
||||
apiError error
|
||||
missingLeader bool
|
||||
authStore auth.AuthStore
|
||||
serializableReadError error
|
||||
linearizableReadError error
|
||||
missingLeader bool
|
||||
authStore auth.AuthStore
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return nil, s.apiError
|
||||
func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
if req.Serializable {
|
||||
return nil, s.serializableReadError
|
||||
}
|
||||
return nil, s.linearizableReadError
|
||||
}
|
||||
|
||||
func (s *fakeHealthServer) Config() config.ServerConfig {
|
||||
@ -148,10 +152,11 @@ func TestHealthHandler(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
apiError: tt.apiError,
|
||||
missingLeader: tt.missingLeader,
|
||||
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
|
||||
fakeServer: fakeServer{alarms: tt.alarms},
|
||||
serializableReadError: tt.apiError,
|
||||
linearizableReadError: tt.apiError,
|
||||
missingLeader: tt.missingLeader,
|
||||
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 0),
|
||||
})
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
@ -187,8 +192,8 @@ func TestHttpSubPath(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
apiError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
@ -271,14 +276,14 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[-]serializable_read failed: range error: Unexpected error"},
|
||||
inResult: []string{"[-]serializable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@ -286,8 +291,8 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
apiError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
|
||||
serializableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
@ -298,6 +303,47 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLinearizableReadCheck(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
tests := []healthTestCase{
|
||||
{
|
||||
name: "Alive normal",
|
||||
healthCheckURL: "/livez?verbose",
|
||||
expectStatusCode: http.StatusOK,
|
||||
inResult: []string{"[+]serializable_read ok"},
|
||||
},
|
||||
{
|
||||
name: "Alive if lineariable range api is not available",
|
||||
healthCheckURL: "/livez",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "Not ready if range api is not available",
|
||||
healthCheckURL: "/readyz",
|
||||
apiError: fmt.Errorf("Unexpected error"),
|
||||
expectStatusCode: http.StatusServiceUnavailable,
|
||||
inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
logger := zaptest.NewLogger(t)
|
||||
s := &fakeHealthServer{
|
||||
linearizableReadError: tt.apiError,
|
||||
authStore: auth.NewAuthStore(logger, schema.NewAuthBackend(logger, be), nil, 0),
|
||||
}
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
|
@ -303,9 +303,16 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
|
||||
// ValidateConfigurationChange takes a proposed ConfChange and
|
||||
// ensures that it is still valid.
|
||||
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
// TODO: this must be switched to backend as well.
|
||||
membersMap, removedMap := membersFromStore(c.lg, c.v2store)
|
||||
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange, shouldApplyV3 ShouldApplyV3) error {
|
||||
var membersMap map[types.ID]*Member
|
||||
var removedMap map[types.ID]bool
|
||||
|
||||
if shouldApplyV3 {
|
||||
membersMap, removedMap = c.be.MustReadMembersFromBackend()
|
||||
} else {
|
||||
membersMap, removedMap = membersFromStore(c.lg, c.v2store)
|
||||
}
|
||||
|
||||
id := types.ID(cc.NodeID)
|
||||
if removedMap[id] {
|
||||
return ErrIDRemoved
|
||||
@ -823,23 +830,6 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
|
||||
return ids
|
||||
}
|
||||
|
||||
// PushMembershipToStorage is overriding storage information about cluster's
|
||||
// members, such that they fully reflect internal RaftCluster's storage.
|
||||
func (c *RaftCluster) PushMembershipToStorage() {
|
||||
if c.be != nil {
|
||||
c.be.TrimMembershipFromBackend()
|
||||
for _, m := range c.members {
|
||||
c.be.MustSaveMemberToBackend(m)
|
||||
}
|
||||
}
|
||||
if c.v2store != nil {
|
||||
TrimMembershipFromV2Store(c.lg, c.v2store)
|
||||
for _, m := range c.members {
|
||||
mustSaveMemberToStore(c.lg, c.v2store, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// buildMembershipMetric sets the knownPeers metric based on the current
|
||||
// members of the cluster.
|
||||
func (c *RaftCluster) buildMembershipMetric() {
|
||||
|
@ -276,8 +276,17 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||
func TestClusterValidateConfigurationChangeV3(t *testing.T) {
|
||||
testClusterValidateConfigurationChange(t, true)
|
||||
}
|
||||
func TestClusterValidateConfigurationChangeV2(t *testing.T) {
|
||||
testClusterValidateConfigurationChange(t, false)
|
||||
}
|
||||
|
||||
func testClusterValidateConfigurationChange(t *testing.T, shouldApplyV3 ShouldApplyV3) {
|
||||
cl := NewCluster(zaptest.NewLogger(t), WithMaxLearners(1))
|
||||
be := newMembershipBackend()
|
||||
cl.SetBackend(be)
|
||||
cl.SetStore(v2store.New())
|
||||
for i := 1; i <= 4; i++ {
|
||||
var isLearner bool
|
||||
@ -455,7 +464,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
err := cl.ValidateConfigurationChange(tt.cc)
|
||||
err := cl.ValidateConfigurationChange(tt.cc, shouldApplyV3)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: validateConfigurationChange error = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
|
@ -56,27 +56,6 @@ func IsMetaStoreOnly(store v2store.Store) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TrimMembershipFromV2Store removes all information about members &
|
||||
// removed_members from the v2 store.
|
||||
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
|
||||
members, removed := membersFromStore(lg, s)
|
||||
|
||||
for mID := range members {
|
||||
_, err := s.Delete(MemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for mID := range removed {
|
||||
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyNoMembersInStore(lg *zap.Logger, s v2store.Store) {
|
||||
members, removed := membersFromStore(lg, s)
|
||||
if len(members) != 0 || len(removed) != 0 {
|
||||
|
@ -16,151 +16,34 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/errors"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/txn"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const v2Version = "v2"
|
||||
|
||||
// ApplierV2 is the interface for processing V2 raft messages
|
||||
type ApplierV2 interface {
|
||||
Delete(r *RequestV2) Response
|
||||
Post(r *RequestV2) Response
|
||||
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
|
||||
QGet(r *RequestV2) Response
|
||||
Sync(r *RequestV2) Response
|
||||
}
|
||||
|
||||
func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
|
||||
if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
|
||||
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
|
||||
}
|
||||
return &applierV2store{lg: lg, store: s, cluster: c}
|
||||
}
|
||||
|
||||
type applierV2store struct {
|
||||
lg *zap.Logger
|
||||
store v2store.Store
|
||||
cluster *membership.RaftCluster
|
||||
}
|
||||
|
||||
func (a *applierV2store) Delete(r *RequestV2) Response {
|
||||
switch {
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
||||
default:
|
||||
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) Post(r *RequestV2) Response {
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
|
||||
ttlOptions := r.TTLOptions()
|
||||
exists, existsSet := pbutil.GetBool(r.PrevExist)
|
||||
switch {
|
||||
case existsSet:
|
||||
if exists {
|
||||
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
|
||||
}
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
|
||||
var attr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
}
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
default:
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
|
||||
var attr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
}
|
||||
if a.cluster != nil {
|
||||
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
if s.cluster != nil {
|
||||
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||
}
|
||||
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if a.cluster != nil {
|
||||
// persist to backend given v2store can be very stale
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||
}
|
||||
return Response{}
|
||||
}
|
||||
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if s.cluster != nil {
|
||||
// persist to backend given v2store can be very stale
|
||||
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||
}
|
||||
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) QGet(r *RequestV2) Response {
|
||||
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Sync(r *RequestV2) Response {
|
||||
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
|
||||
return Response{}
|
||||
}
|
||||
|
||||
// applyV2Request interprets r as a call to v2store.X
|
||||
// and returns a Response interpreted from v2store.Event
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
|
||||
stringer := panicAlternativeStringer{
|
||||
stringer: r,
|
||||
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
|
||||
}
|
||||
defer func(start time.Time) {
|
||||
if !utf8.ValidString(r.Method) {
|
||||
s.lg.Info("method is not valid utf-8")
|
||||
return
|
||||
}
|
||||
success := resp.Err == nil
|
||||
txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
|
||||
txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
|
||||
}(time.Now())
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
return s.applyV2.Post(r)
|
||||
case "PUT":
|
||||
return s.applyV2.Put(r, shouldApplyV3)
|
||||
case "DELETE":
|
||||
return s.applyV2.Delete(r)
|
||||
case "QGET":
|
||||
return s.applyV2.QGet(r)
|
||||
case "SYNC":
|
||||
return s.applyV2.Sync(r)
|
||||
default:
|
||||
// This should never be reached, but just in case:
|
||||
return Response{Err: errors.ErrUnknownMethod}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
|
||||
refresh, _ := pbutil.GetBool(r.Refresh)
|
||||
ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
|
||||
if r.Expiration != 0 {
|
||||
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
|
||||
}
|
||||
return ttlOptions
|
||||
}
|
||||
|
||||
func toResponse(ev *v2store.Event, err error) Response {
|
||||
return Response{Event: ev, Err: err}
|
||||
}
|
||||
|
@ -240,6 +240,9 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper, time
|
||||
cc := &http.Client{
|
||||
Transport: rt,
|
||||
Timeout: timeout,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
},
|
||||
}
|
||||
var (
|
||||
err error
|
||||
|
@ -140,8 +140,6 @@ type ServerV2 interface {
|
||||
Server
|
||||
Leader() types.ID
|
||||
|
||||
// Do takes a V2 request and attempts to fulfill it, returning a Response.
|
||||
Do(ctx context.Context, r pb.Request) (Response, error)
|
||||
ClientCertAuthEnabled() bool
|
||||
}
|
||||
|
||||
@ -247,8 +245,6 @@ type EtcdServer struct {
|
||||
v2store v2store.Store
|
||||
snapshotter *snap.Snapshotter
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
uberApply apply.UberApplier
|
||||
|
||||
applyWait wait.WaitTime
|
||||
@ -338,7 +334,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
|
||||
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||
|
||||
srv.be = b.storage.backend.be
|
||||
srv.beHooks = b.storage.backend.beHooks
|
||||
@ -746,21 +741,6 @@ func (s *EtcdServer) run() {
|
||||
// asynchronously accept toApply packets, dispatch progress in-order
|
||||
sched := schedule.NewFIFOScheduler(lg)
|
||||
|
||||
var (
|
||||
smu sync.RWMutex
|
||||
syncC <-chan time.Time
|
||||
)
|
||||
setSyncC := func(ch <-chan time.Time) {
|
||||
smu.Lock()
|
||||
syncC = ch
|
||||
smu.Unlock()
|
||||
}
|
||||
getSyncC := func() (ch <-chan time.Time) {
|
||||
smu.RLock()
|
||||
ch = syncC
|
||||
smu.RUnlock()
|
||||
return
|
||||
}
|
||||
rh := &raftReadyHandler{
|
||||
getLead: func() (lead uint64) { return s.getLead() },
|
||||
updateLead: func(lead uint64) { s.setLead(lead) },
|
||||
@ -772,7 +752,6 @@ func (s *EtcdServer) run() {
|
||||
if s.compactor != nil {
|
||||
s.compactor.Pause()
|
||||
}
|
||||
setSyncC(nil)
|
||||
} else {
|
||||
if newLeader {
|
||||
t := time.Now()
|
||||
@ -780,7 +759,6 @@ func (s *EtcdServer) run() {
|
||||
s.leadElectedTime = t
|
||||
s.leadTimeMu.Unlock()
|
||||
}
|
||||
setSyncC(s.SyncTicker.C)
|
||||
if s.compactor != nil {
|
||||
s.compactor.Resume()
|
||||
}
|
||||
@ -847,10 +825,6 @@ func (s *EtcdServer) run() {
|
||||
lg.Warn("server error", zap.Error(err))
|
||||
lg.Warn("data-dir used by this member must be removed")
|
||||
return
|
||||
case <-getSyncC():
|
||||
if s.v2store.HasTTLKeys() {
|
||||
s.sync(s.Cfg.ReqTimeout())
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
@ -1691,25 +1665,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
|
||||
}
|
||||
}
|
||||
|
||||
// sync proposes a SYNC request and is non-blocking.
|
||||
// This makes no guarantee that the request will be proposed or performed.
|
||||
// The request will be canceled after the given timeout.
|
||||
func (s *EtcdServer) sync(timeout time.Duration) {
|
||||
req := pb.Request{
|
||||
Method: "SYNC",
|
||||
ID: s.reqIDGen.Next(),
|
||||
Time: time.Now().UnixNano(),
|
||||
}
|
||||
data := pbutil.MustMarshal(&req)
|
||||
// There is no promise that node has leader when do SYNC request,
|
||||
// so it uses goroutine to propose.
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeout)
|
||||
s.GoAttach(func() {
|
||||
s.r.Propose(ctx, data)
|
||||
cancel()
|
||||
})
|
||||
}
|
||||
|
||||
// publishV3 registers server information into the cluster using v3 request. The
|
||||
// information is the JSON representation of this server's member struct, updated
|
||||
// with the static clientURLs of the server.
|
||||
@ -1816,30 +1771,30 @@ func (s *EtcdServer) apply(
|
||||
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
s.lg.Debug("Applying entry",
|
||||
zap.Uint64("index", e.Index),
|
||||
zap.Uint64("term", e.Term),
|
||||
zap.Stringer("type", e.Type))
|
||||
zap.Uint64("consistent-index", index),
|
||||
zap.Uint64("entry-index", e.Index),
|
||||
zap.Uint64("entry-term", e.Term),
|
||||
zap.Stringer("entry-type", e.Type))
|
||||
|
||||
// We need to toApply all WAL entries on top of v2store
|
||||
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
if e.Index > index {
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
}
|
||||
switch e.Type {
|
||||
case raftpb.EntryNormal:
|
||||
// gofail: var beforeApplyOneEntryNormal struct{}
|
||||
s.applyEntryNormal(&e)
|
||||
s.applyEntryNormal(&e, shouldApplyV3)
|
||||
s.setAppliedIndex(e.Index)
|
||||
s.setTerm(e.Term)
|
||||
|
||||
case raftpb.EntryConfChange:
|
||||
// gofail: var beforeApplyOneConfChange struct{}
|
||||
|
||||
// We need to toApply all WAL entries on top of v2store
|
||||
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
|
||||
@ -1861,14 +1816,9 @@ func (s *EtcdServer) apply(
|
||||
}
|
||||
|
||||
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.ShouldApplyV3) {
|
||||
var ar *apply.Result
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
if shouldApplyV3 {
|
||||
defer func() {
|
||||
// The txPostLockInsideApplyHook will not get called in some cases,
|
||||
// in which we should move the consistent index forward directly.
|
||||
@ -1878,10 +1828,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
}
|
||||
}()
|
||||
}
|
||||
s.lg.Debug("toApply entry normal",
|
||||
zap.Uint64("consistent-index", index),
|
||||
zap.Uint64("entry-index", e.Index),
|
||||
zap.Bool("should-applyV3", bool(shouldApplyV3)))
|
||||
|
||||
// raft state machine may generate noop entry when leader confirmation.
|
||||
// skip it in advance to avoid some potential bug in the future
|
||||
@ -1902,14 +1848,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
rp := &r
|
||||
pbutil.MustUnmarshal(rp, e.Data)
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
||||
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
|
||||
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
|
||||
s.w.Trigger(r.ID, Response{})
|
||||
return
|
||||
}
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||
|
||||
if raftReq.V2 != nil {
|
||||
req := (*RequestV2)(raftReq.V2)
|
||||
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
|
||||
s.applyV2Request(req, shouldApplyV3)
|
||||
s.w.Trigger(req.ID, Response{})
|
||||
return
|
||||
}
|
||||
|
||||
@ -1987,7 +1935,7 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
|
||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
if err := s.cluster.ValidateConfigurationChange(cc, shouldApplyV3); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.r.ApplyConfChange(cc)
|
||||
|
||||
@ -2267,46 +2215,6 @@ func (s *EtcdServer) monitorCompactHash() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) updateClusterVersionV2(ver string) {
|
||||
lg := s.Logger()
|
||||
|
||||
if s.cluster.Version() == nil {
|
||||
lg.Info(
|
||||
"setting up initial cluster version using v2 API",
|
||||
zap.String("cluster-version", version.Cluster(ver)),
|
||||
)
|
||||
} else {
|
||||
lg.Info(
|
||||
"updating cluster version using v2 API",
|
||||
zap.String("from", version.Cluster(s.cluster.Version().String())),
|
||||
zap.String("to", version.Cluster(ver)),
|
||||
)
|
||||
}
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
Path: membership.StoreClusterVersionKey(),
|
||||
Val: ver,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
|
||||
_, err := s.Do(ctx, req)
|
||||
cancel()
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
|
||||
return
|
||||
|
||||
case errors.ErrStopped:
|
||||
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
|
||||
return
|
||||
|
||||
default:
|
||||
lg.Warn("failed to update cluster version", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) updateClusterVersionV3(ver string) {
|
||||
lg := s.Logger()
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -15,152 +15,11 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/errors"
|
||||
)
|
||||
|
||||
type RequestV2 pb.Request
|
||||
|
||||
type RequestV2Handler interface {
|
||||
Post(ctx context.Context, r *RequestV2) (Response, error)
|
||||
Put(ctx context.Context, r *RequestV2) (Response, error)
|
||||
Delete(ctx context.Context, r *RequestV2) (Response, error)
|
||||
QGet(ctx context.Context, r *RequestV2) (Response, error)
|
||||
Get(ctx context.Context, r *RequestV2) (Response, error)
|
||||
Head(ctx context.Context, r *RequestV2) (Response, error)
|
||||
}
|
||||
|
||||
type reqV2HandlerEtcdServer struct {
|
||||
reqV2HandlerStore
|
||||
s *EtcdServer
|
||||
}
|
||||
|
||||
type reqV2HandlerStore struct {
|
||||
store v2store.Store
|
||||
applier ApplierV2
|
||||
}
|
||||
|
||||
func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler {
|
||||
return &reqV2HandlerStore{s, applier}
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.applier.Post(r), nil
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.applier.Put(r, membership.ApplyBoth), nil
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.applier.Delete(r), nil
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.applier.QGet(r), nil
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
if r.Wait {
|
||||
wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
|
||||
return Response{Watcher: wc}, err
|
||||
}
|
||||
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
|
||||
return Response{Event: ev}, err
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
|
||||
return Response{Event: ev}, err
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.processRaftRequest(ctx, r)
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.processRaftRequest(ctx, r)
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.processRaftRequest(ctx, r)
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
return a.processRaftRequest(ctx, r)
|
||||
}
|
||||
|
||||
func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) {
|
||||
data, err := ((*pb.Request)(r)).Marshal()
|
||||
if err != nil {
|
||||
return Response{}, err
|
||||
}
|
||||
ch := a.s.w.Register(r.ID)
|
||||
|
||||
start := time.Now()
|
||||
a.s.r.Propose(ctx, data)
|
||||
proposalsPending.Inc()
|
||||
defer proposalsPending.Dec()
|
||||
|
||||
select {
|
||||
case x := <-ch:
|
||||
resp := x.(Response)
|
||||
return resp, resp.Err
|
||||
case <-ctx.Done():
|
||||
proposalsFailed.Inc()
|
||||
a.s.w.Trigger(r.ID, nil) // GC wait
|
||||
return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
|
||||
case <-a.s.stopping:
|
||||
}
|
||||
return Response{}, errors.ErrStopped
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
||||
r.ID = s.reqIDGen.Next()
|
||||
h := &reqV2HandlerEtcdServer{
|
||||
reqV2HandlerStore: reqV2HandlerStore{
|
||||
store: s.v2store,
|
||||
applier: s.applyV2,
|
||||
},
|
||||
s: s,
|
||||
}
|
||||
rp := &r
|
||||
resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
|
||||
resp.Term, resp.Index = s.Term(), s.CommittedIndex()
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Handle interprets r and performs an operation on s.store according to r.Method
|
||||
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
|
||||
// Quorum == true, r will be sent through consensus before performing its
|
||||
// respective operation. Do will block until an action is performed or there is
|
||||
// an error.
|
||||
func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) {
|
||||
if r.Method == "GET" && r.Quorum {
|
||||
r.Method = "QGET"
|
||||
}
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
return v2api.Post(ctx, r)
|
||||
case "PUT":
|
||||
return v2api.Put(ctx, r)
|
||||
case "DELETE":
|
||||
return v2api.Delete(ctx, r)
|
||||
case "QGET":
|
||||
return v2api.QGet(ctx, r)
|
||||
case "GET":
|
||||
return v2api.Get(ctx, r)
|
||||
case "HEAD":
|
||||
return v2api.Head(ctx, r)
|
||||
}
|
||||
return Response{}, errors.ErrUnknownMethod
|
||||
}
|
||||
|
||||
func (r *RequestV2) String() string {
|
||||
rpb := pb.Request(*r)
|
||||
return rpb.String()
|
||||
|
@ -29,10 +29,10 @@ require (
|
||||
go.etcd.io/etcd/client/v3 v3.6.0-alpha.0
|
||||
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
|
||||
go.opentelemetry.io/otel v1.20.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0
|
||||
go.opentelemetry.io/otel/sdk v1.20.0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0
|
||||
go.opentelemetry.io/otel/sdk v1.21.0
|
||||
go.uber.org/multierr v1.11.0
|
||||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/crypto v0.15.0
|
||||
@ -60,9 +60,9 @@ require (
|
||||
github.com/prometheus/procfs v0.11.1 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.21.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
golang.org/x/sys v0.14.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
|
@ -127,20 +127,20 @@ go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
|
||||
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
|
||||
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
|
||||
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
|
||||
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
|
||||
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0=
|
||||
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
|
||||
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
|
@ -291,3 +291,21 @@ func ctlV3MemberUpdate(cx ctlCtx, memberID, peerURL string) error {
|
||||
cmdArgs := append(cx.PrefixArgs(), "member", "update", memberID, fmt.Sprintf("--peer-urls=%s", peerURL))
|
||||
return e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, expect.ExpectedResponse{Value: " updated in cluster "})
|
||||
}
|
||||
|
||||
func TestRemoveNonExistingMember(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
cfg := e2e.ConfigStandalone(*e2e.NewConfig())
|
||||
epc, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(cfg))
|
||||
assert.NoError(t, err)
|
||||
defer epc.Close()
|
||||
c := epc.Etcdctl()
|
||||
|
||||
_, err = c.MemberRemove(ctx, 1)
|
||||
assert.Error(t, err)
|
||||
|
||||
// Ensure that membership is properly bootstrapped.
|
||||
err = epc.Restart(ctx)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
@ -226,22 +226,54 @@ func TestHTTPLivezReadyzHandler(t *testing.T) {
|
||||
name: "overloaded server slow apply",
|
||||
injectFailure: triggerSlowApply,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)},
|
||||
healthChecks: defaultHealthCheckConfigs,
|
||||
// TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read.
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "network partitioned",
|
||||
injectFailure: blackhole,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)},
|
||||
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
|
||||
healthChecks: defaultHealthCheckConfigs,
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
expectedRespSubStrings: []string{
|
||||
`[-]linearizable_read failed: etcdserver: leader changed`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "raft loop deadlock",
|
||||
injectFailure: triggerRaftLoopDeadLock,
|
||||
clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)},
|
||||
// TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented.
|
||||
// TODO expected behavior of readyz check should be 503 or timeout after ReadIndex check is implemented.
|
||||
healthChecks: defaultHealthCheckConfigs,
|
||||
healthChecks: []healthCheckConfig{
|
||||
{
|
||||
url: "/livez",
|
||||
expectedStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
url: "/readyz",
|
||||
expectedTimeoutError: true,
|
||||
expectedStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
},
|
||||
},
|
||||
// verify that auth enabled serializable read must go through mvcc
|
||||
{
|
||||
|
@ -38,73 +38,82 @@ import (
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func createV2store(t testing.TB, dataDirPath string) string {
|
||||
t.Log("Creating not-yet v2-deprecated etcd")
|
||||
|
||||
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
|
||||
e2e.WithVersion(e2e.LastVersion),
|
||||
e2e.WithEnableV2(true),
|
||||
e2e.WithDataDirPath(dataDirPath),
|
||||
e2e.WithSnapshotCount(5),
|
||||
))
|
||||
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
|
||||
assert.NoError(t, err)
|
||||
memberDataDir := epc.Procs[0].Config().DataDirPath
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, epc.Stop())
|
||||
}()
|
||||
|
||||
// We need to exceed 'SnapshotCount' such that v2 snapshot is dumped.
|
||||
for i := 0; i < 10; i++ {
|
||||
func writeCustomV2Data(t testing.TB, epc *e2e.EtcdProcessCluster, count int) {
|
||||
for i := 0; i < count; i++ {
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{
|
||||
Endpoint: "/v2/keys/foo", Value: "bar" + fmt.Sprint(i),
|
||||
Expected: expect.ExpectedResponse{Value: `{"action":"set","node":{"key":"/foo","value":"bar` + fmt.Sprint(i)}}); err != nil {
|
||||
t.Fatalf("failed put with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
return memberDataDir
|
||||
}
|
||||
|
||||
func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) {
|
||||
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + dataDirPath}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func assertVerifyCannotStartV2deprecationNotYet(t testing.TB, dataDirPath string) {
|
||||
func TestV2DeprecationNotYet(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
t.Log("Verify its infeasible to start etcd with --v2-deprecation=not-yet mode")
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=not-yet", "--data-dir=" + dataDirPath}, nil)
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=not-yet"}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = proc.Expect(`invalid value "not-yet" for flag -v2-deprecation: invalid value "not-yet"`)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestV2DeprecationFlags(t *testing.T) {
|
||||
func TestV2DeprecationWriteOnlyWAL(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
dataDirPath := t.TempDir()
|
||||
|
||||
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
|
||||
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
|
||||
}
|
||||
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
|
||||
e2e.WithVersion(e2e.LastVersion),
|
||||
e2e.WithEnableV2(true),
|
||||
e2e.WithDataDirPath(dataDirPath),
|
||||
))
|
||||
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
|
||||
assert.NoError(t, err)
|
||||
memberDataDir := epc.Procs[0].Config().DataDirPath
|
||||
|
||||
var memberDataDir string
|
||||
t.Run("create-storev2-data", func(t *testing.T) {
|
||||
memberDataDir = createV2store(t, dataDirPath)
|
||||
})
|
||||
writeCustomV2Data(t, epc, 1)
|
||||
|
||||
t.Run("--v2-deprecation=not-yet fails", func(t *testing.T) {
|
||||
assertVerifyCannotStartV2deprecationNotYet(t, memberDataDir)
|
||||
})
|
||||
assert.NoError(t, epc.Stop())
|
||||
|
||||
t.Run("--v2-deprecation=write-only fails", func(t *testing.T) {
|
||||
assertVerifyCannotStartV2deprecationWriteOnly(t, memberDataDir)
|
||||
})
|
||||
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + memberDataDir}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = proc.Expect("detected disallowed v2 WAL for stage --v2-deprecation=write-only")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestV2DeprecationWriteOnlySnapshot(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
dataDirPath := t.TempDir()
|
||||
|
||||
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
|
||||
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
|
||||
}
|
||||
cfg := e2e.ConfigStandalone(*e2e.NewConfig(
|
||||
e2e.WithVersion(e2e.LastVersion),
|
||||
e2e.WithEnableV2(true),
|
||||
e2e.WithDataDirPath(dataDirPath),
|
||||
e2e.WithSnapshotCount(10),
|
||||
))
|
||||
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
|
||||
assert.NoError(t, err)
|
||||
memberDataDir := epc.Procs[0].Config().DataDirPath
|
||||
|
||||
// We need to exceed 'SnapshotCount' such that v2 snapshot is dumped.
|
||||
writeCustomV2Data(t, epc, 10)
|
||||
|
||||
assert.NoError(t, epc.Stop())
|
||||
|
||||
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--v2-deprecation=write-only", "--data-dir=" + memberDataDir}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestV2DeprecationSnapshotMatches(t *testing.T) {
|
||||
|
14
tests/go.mod
14
tests/go.mod
@ -35,10 +35,10 @@ require (
|
||||
go.etcd.io/etcd/server/v3 v3.6.0-alpha.0
|
||||
go.etcd.io/gofail v0.1.0
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
|
||||
go.opentelemetry.io/otel v1.20.0
|
||||
go.opentelemetry.io/otel/sdk v1.20.0
|
||||
go.opentelemetry.io/otel/trace v1.20.0
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/sdk v1.21.0
|
||||
go.opentelemetry.io/otel/trace v1.21.0
|
||||
go.opentelemetry.io/proto/otlp v1.0.0
|
||||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/crypto v0.15.0
|
||||
@ -84,9 +84,9 @@ require (
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||
go.etcd.io/bbolt v1.3.8 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/net v0.18.0 // indirect
|
||||
golang.org/x/sys v0.14.0 // indirect
|
||||
|
28
tests/go.sum
28
tests/go.sum
@ -153,20 +153,20 @@ go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
|
||||
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0 h1:Yatm3/u91jNJTGVeENBBg5QSh1BQJ541IBS9nb5JDkw=
|
||||
go.etcd.io/raft/v3 v3.0.0-20231012085229-7c3ed830bbb0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M=
|
||||
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
|
||||
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 h1:DeFD0VgTZ+Cj6hxravYYZE2W4GlneVH81iAOPjZkzk8=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0/go.mod h1:GijYcYmNpX1KazD5JmWGsi4P7dDTTTnfv1UbGn84MnU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 h1:gvmNvqrPYovvyRmCSygkUDyL8lC5Tl845MLEwqpxhEU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0/go.mod h1:vNUq47TGFioo+ffTSnKNdob241vePmtNZnAODKapKd0=
|
||||
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
|
||||
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM=
|
||||
go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0=
|
||||
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
|
||||
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
|
||||
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
|
||||
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
|
||||
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
|
||||
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
|
||||
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
|
||||
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
|
||||
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
|
@ -24,6 +24,9 @@ import (
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
// TestEndpointSwitchResolvesViolation ensures
|
||||
// - ErrNoGreaterRev error is returned from partitioned member when it has stale revision
|
||||
// - no more error after partition recovers
|
||||
func TestEndpointSwitchResolvesViolation(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
|
||||
@ -78,8 +81,16 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
|
||||
if err != ordering.ErrNoGreaterRev {
|
||||
t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error")
|
||||
}
|
||||
|
||||
clus.Members[2].RecoverPartition(t, clus.Members[:2]...)
|
||||
time.Sleep(1 * time.Second) // give enough time for the operation
|
||||
_, err = orderingKv.Get(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Fatal("After partition recovered, third member should recover and return no error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUnresolvableOrderViolation ensures ErrNoGreaterRev error is returned when available members only have stale revisions
|
||||
func TestUnresolvableOrderViolation(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 5, UseBridge: true})
|
||||
|
Reference in New Issue
Block a user