Compare commits

...

40 Commits

Author SHA1 Message Date
99018a77be version: bump up to 3.5.2 2022-02-01 12:28:34 +01:00
a624446907 Merge pull request #13616 from ptabor/20220117-update-yaml
Update dep: gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254 [release 3.5]
2022-01-17 20:07:16 +01:00
74f33d6665 Update dep: require gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254. 2022-01-17 17:10:50 +01:00
7291ed3c4a Merge pull request #13541 from michaljasionowski/backport-runlock-fix
Backport watchablestore runlock bug fix to release-3.5
2021-12-21 11:03:31 +01:00
55c16df997 fix runlock bug 2021-12-16 15:58:41 +00:00
73080a7166 Merge pull request #13501 from ahrtr/reset_ci_after_reload_db_3.5
[3.5] Set the backend again after recovering v3 backend from snapshot
2021-12-06 13:22:22 +01:00
e84c61104c Merge pull request #13515 from serathius/checkpoints-fix-3.5
Backport Lease Checkpoints fix to release-3.5
2021-12-03 12:21:02 +01:00
d00e89db2e server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL
To avoid inconsistant behavior during cluster upgrade we are feature
gating persistance behind cluster version. This should ensure that
all cluster members are upgraded to v3.6 before changing behavior.

To allow backporting this fix to v3.5 we are also introducing flag
--experimental-enable-lease-checkpoint-persist that will allow for
smooth upgrade in v3.5 clusters with this feature enabled.
2021-12-02 16:54:10 +01:00
eddfb4232f etcdserver,integration: Store remaining TTL on checkpoint
To extend lease checkpointing mechanism to cases when the whole etcd
cluster is restarted.
2021-12-02 16:42:20 +01:00
21634a98c6 lease,integration: add checkpoint scheduling after leader change
Current checkpointing mechanism is buggy. New checkpoints for any lease
are scheduled only until the first leader change. Added fix for that
and a test that will check it.
2021-12-02 16:40:14 +01:00
8c81598455 set the backend again after recovering v3 backend from snapshot 2021-11-25 05:45:20 +08:00
eac7f98699 Merge pull request #13477 from mitake/backport-13308-to-3.5
Backport PR 13308 to release-3.5
2021-11-21 14:45:15 -05:00
dec6f72d68 *: implement a retry logic for auth old revision in the client 2021-11-15 00:09:16 +09:00
79bbc8fdb7 client/v3: refresh the token when ErrUserEmpty is received while retrying
To fix a bug in the retry logic caused when the auth token is cleared after receiving `ErrInvalidAuthToken` from the server and the subsequent call to `getToken` also fails due to some reason (eg. context deadline exceeded).
This leaves the client without a token and the retry will continue to fail with `ErrUserEmpty` unless the token is refreshed.
2021-11-15 00:09:09 +09:00
77d760bf1b Merge pull request #13476 from chaochn47/backport-release-3.5
cherry-pick to 3.5 from #13467 exclude the same alarm type activated by multiple peers
2021-11-13 22:10:19 -05:00
7d44a7cd6e server/etcdserver/api/etcdhttp: exclude the same alarm type activated by multiple peers 2021-11-12 14:21:14 -08:00
e8732fb5f3 Merge pull request #13395 from geetasg/release-3.5
storage/backend: Add a gauge to indicate if defrag is active (backport)
2021-10-07 18:16:21 +02:00
446f7d6b6e storage/backend: Add a gauge to indicate if defrag is active (backport from 3.6) 2021-10-06 11:01:31 -07:00
d42e8589e1 version: 3.5.1
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-10-03 11:47:37 -04:00
ec562294f7 Merge pull request #13380 from hexfusion/cp-13376
[release-3.5] Dockerfile: bump debian bullseye-20210927
2021-10-01 13:23:50 -04:00
bad9a52c4c Dockerfile: bump debian bullseye-20210927
fixes: CVE-2021-3711, CVE-2021-35942, CVE-2019-9893

Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-10-01 12:48:57 -04:00
edb3b5a794 Merge pull request #13375 from serathius/authority-3.5
Cherry pick "Fix http2 authority header in single endpoint scenario" to release-3.5
2021-09-30 13:56:33 +02:00
79f9a45574 client: Use first endpoint as http2 authority header 2021-09-30 12:15:33 +02:00
7f25a500e3 tests: Add grpc authority e2e tests 2021-09-30 12:15:33 +02:00
58d2b12a50 client: Add grpc authority header integration tests 2021-09-30 12:15:32 +02:00
6e04e8ae42 tests: Allow configuring integration tests to use TCP 2021-09-30 12:05:25 +02:00
7272a9585d test: Use unique number for grpc port 2021-09-30 12:05:15 +02:00
0bac49bda4 tests: Cleanup member interface by exposing Bridge directly 2021-09-30 12:05:10 +02:00
f324894e8f tests: Make using bridge optional 2021-09-30 12:05:04 +02:00
994bd08723 tests: Rename grpcAddr to grpcURL to imply that it includes schema 2021-09-30 12:04:57 +02:00
c1f48d8077 tests: Remove bridge dependency on unix 2021-09-30 12:04:50 +02:00
6e2fe84ebd Decouple prefixArgs from os.Env dependency
prefixArgs uses os.Setenv in e2e tests instead envMap.
This creates overwrites in some test cases and have an impact
on test quality and isolation between tests.
This PR uses ctlcontext envMap in each tests with high priority
and merges os environment variables with low priority.
2021-09-30 12:04:31 +02:00
4312298b73 Merge pull request #13348 from serathius/sync
Fix for v3.5 Ensure that cluster members stored in v2store and backend are in sync
2021-09-25 17:33:55 +02:00
e68c7ab4bc server: Ensure that adding and removing members handle storev2 and backend out of sync 2021-09-15 14:36:41 +02:00
d7eeda09f7 Merge pull request #13349 from serathius/tip-3.5
Stop using tip golang version in CI
2021-09-15 08:04:37 -04:00
921f78d56f Stop using tip golang version in CI 2021-09-15 10:29:06 +02:00
2fe94b19d3 Merge pull request #13257 from tangcong/automated-cherry-pick-of-#13145-#13237-origin-release-3.5
[backport 3.5]: Automated cherry pick of #13145 #13237
2021-08-06 09:01:14 -04:00
627d91c89d fix self-signed-cert-validity parameter cannot be specified in the config file 2021-07-30 07:53:43 +08:00
dfd2fea4c5 fix health endpoint not usable when authentication is enabled 2021-07-30 07:53:40 +08:00
beae2e1801 workflows: remove ARM64 job for maintenance
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2021-07-03 13:01:37 -07:00
107 changed files with 1858 additions and 658 deletions

View File

@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
go-version: "1.16.3"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
go-version: "1.16.3"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
go-version: "1.16.3"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -1,19 +0,0 @@
name: Linux ARM64 Graviton2
on: [push, pull_request]
jobs:
test:
runs-on: [self-hosted, linux, ARM64, graviton2]
steps:
- uses: actions/checkout@v2
- name: install dependencies
run: |
sudo amazon-linux-extras install epel
sudo yum install -y git gcc ShellCheck
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
- run: go version
- run: date
- name: Run tests
run: TEST_OPTS="PASSES='fmt bom dep build unit integration_e2e'" make test

View File

@ -1,142 +0,0 @@
## Graviton-based self-hosted github action worker
### Step 1. Create an EC2 instance with Graviton
Create an AWS Graviton-based EC2 instance. For example,
```
# or download from https://github.com/aws/aws-k8s-tester/releases
cd ${HOME}/go/src/github.com/aws/aws-k8s-tester
go install -v ./cmd/ec2-utils
# create arm64 AL2 instance
AWS_K8S_TESTER_EC2_ON_FAILURE_DELETE=true \
AWS_K8S_TESTER_EC2_LOG_COLOR=true \
AWS_K8S_TESTER_EC2_REGION=us-west-2 \
AWS_K8S_TESTER_EC2_S3_BUCKET_CREATE=true \
AWS_K8S_TESTER_EC2_S3_BUCKET_CREATE_KEEP=true \
AWS_K8S_TESTER_EC2_REMOTE_ACCESS_KEY_CREATE=true \
AWS_K8S_TESTER_EC2_ASGS_FETCH_LOGS=false \
AWS_K8S_TESTER_EC2_ASGS='{"GetRef.Name-arm64-al2-cpu":{"name":"GetRef.Name-arm64-al2-cpu","remote-access-user-name":"ec2-user","ami-type":"AL2_arm_64","image-id-ssm-parameter":"/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-arm64-gp2","instance-types":["m6g.xlarge"],"volume-size":40,"asg-min-size":1,"asg-max-size":1,"asg-desired-capacity":1}}' \
AWS_K8S_TESTER_EC2_ROLE_CREATE=true \
AWS_K8S_TESTER_EC2_VPC_CREATE=true \
ec2-utils create instances --enable-prompt=true --auto-path
```
### Step 2. Install github action on the host
SSH into the instance, and install the github action self-hosted runner (see [install scripts](https://github.com/etcd-io/etcd/settings/actions/runners/new?arch=arm64&os=linux)).
### Step 3. Configure github action on the host
SSH into the instance, and configure the github action self-hosted runner.
First, we need disable ICU install (see [actions/runner issue on ARM64](https://github.com/actions/runner/issues/629)):
```
sudo yum install -y patch
```
And write this bash script:
```
#!/bin/bash -e
patch -p1 <<ICU_PATCH
diff -Naur a/bin/Runner.Listener.runtimeconfig.json b/bin/Runner.Listener.runtimeconfig.json
--- a/bin/Runner.Listener.runtimeconfig.json 2020-07-01 02:21:09.000000000 +0000
+++ b/bin/Runner.Listener.runtimeconfig.json 2020-07-28 00:02:38.748868613 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
diff -Naur a/bin/Runner.PluginHost.runtimeconfig.json b/bin/Runner.PluginHost.runtimeconfig.json
--- a/bin/Runner.PluginHost.runtimeconfig.json 2020-07-01 02:21:22.000000000 +0000
+++ b/bin/Runner.PluginHost.runtimeconfig.json 2020-07-28 00:02:59.358680003 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
diff -Naur a/bin/Runner.Worker.runtimeconfig.json b/bin/Runner.Worker.runtimeconfig.json
--- a/bin/Runner.Worker.runtimeconfig.json 2020-07-01 02:21:16.000000000 +0000
+++ b/bin/Runner.Worker.runtimeconfig.json 2020-07-28 00:02:19.159028531 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
ICU_PATCH
```
And now patch the github action runner:
```
cd ${HOME}/actions-runner
bash patch.sh
```
```
patching file bin/Runner.Listener.runtimeconfig.json
patching file bin/Runner.PluginHost.runtimeconfig.json
patching file bin/Runner.Worker.runtimeconfig.json
```
And now configure:
```
sudo yum install -y wget
INSTANCE_ID=$(wget -q -O - http://169.254.169.254/latest/meta-data/instance-id)
echo ${INSTANCE_ID}
# get token from https://github.com/etcd-io/etcd/settings/actions/runners/new?arch=arm64&os=linux
cd ${HOME}/actions-runner
./config.sh \
--work "_work" \
--name ${INSTANCE_ID} \
--labels self-hosted,linux,ARM64,graviton2 \
--url https://github.com/etcd-io/etcd \
--token ...
```
And run:
```
# run this as a process in the terminal
cd ${HOME}/actions-runner
./run.sh
# or run this as a systemd service
cd ${HOME}/actions-runner
sudo ./svc.sh install
sudo ./svc.sh start
sudo ./svc.sh status
```
### Step 4. Create github action configuration
See https://github.com/etcd-io/etcd/pull/12928.

View File

@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
go-version: "1.16.3"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base:buster-v1.4.0
# TODO: move to k8s.gcr.io/build-image/debian-base:bullseye-v1.y.z when patched
FROM debian:bullseye-20210927
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-arm64:buster-v1.4.0
# TODO: move to k8s.gcr.io/build-image/debian-base-arm64:bullseye-1.y.z when patched
FROM arm64v8/debian:bullseye-20210927
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-ppc64le:buster-v1.4.0
# TODO: move to k8s.gcr.io/build-image/debian-base-ppc64le:bullseye-1.y.z when patched
FROM ppc64le/debian:bullseye-20210927
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-s390x:buster-v1.4.0
# TODO: move to k8s.gcr.io/build-image/debian-base-s390x:bullseye-1.y.z when patched
FROM s390x/debian:bullseye-20210927
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -9,6 +9,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0 // indirect
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -143,7 +143,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -65,6 +65,7 @@ var (
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
@ -131,6 +132,7 @@ var (
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
@ -195,6 +197,7 @@ var (
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
ErrNoLeader = Error(ErrGRPCNoLeader)

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.5.0"
Version = "3.5.2"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@ -5,8 +5,8 @@ go 1.16
require (
github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
)
replace (

View File

@ -154,8 +154,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -296,9 +296,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
@ -306,6 +304,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
return conn, nil
}
func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {

View File

@ -6,8 +6,8 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.11.0
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0
sigs.k8s.io/yaml v1.2.0

View File

@ -258,8 +258,9 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -73,7 +73,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
// its the callCtx deadline or cancellation, in which case try again.
continue
}
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
if c.shouldRefreshToken(lastErr, callOpts) {
// clear auth token before refreshing it.
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
@ -148,6 +148,19 @@ func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamCli
}
}
// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions,
// and returns a boolean value.
func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
// refresh the token when username, password is present but the server returns ErrUserEmpty
// which is possible when the client token is cleared somehow
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
}
return callOpts.retryAuth &&
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
}
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy.
@ -245,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
// its the callCtx deadline or cancellation, in which case try again.
return true, err
}
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
if s.client.shouldRefreshToken(err, s.callOpts) {
// clear auth token to avoid failure when call getToken
s.client.authTokenBundle.UpdateAuthToken("")

View File

@ -0,0 +1,124 @@
package clientv3
import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/credentials"
grpccredentials "google.golang.org/grpc/credentials"
"testing"
)
type dummyAuthTokenBundle struct{}
func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
return nil
}
func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
return nil
}
func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
return nil, nil
}
func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
}
func TestClientShouldRefreshToken(t *testing.T) {
type fields struct {
authTokenBundle credentials.Bundle
}
type args struct {
err error
callOpts *options
}
optsWithTrue := &options{
retryAuth: true,
}
optsWithFalse := &options{
retryAuth: false,
}
tests := []struct {
name string
fields fields
args args
want bool
}{
{
name: "ErrUserEmpty and non nil authTokenBundle",
fields: fields{
authTokenBundle: &dummyAuthTokenBundle{},
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: true,
},
{
name: "ErrUserEmpty and nil authTokenBundle",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: false,
},
{
name: "ErrGRPCInvalidAuthToken and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
want: true,
},
{
name: "ErrGRPCInvalidAuthToken and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
want: false,
},
{
name: "ErrGRPCAuthOldRevision and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
want: true,
},
{
name: "ErrGRPCAuthOldRevision and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
want: false,
},
{
name: "Other error and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
want: false,
},
{
name: "Other error and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
authTokenBundle: tt.fields.authTokenBundle,
}
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -125,6 +125,9 @@ peer-transport-security:
# Peer TLS using generated certificates.
auto-tls: false
# The validity period of the self-signed certificate, the unit is year.
self-signed-cert-validity: 1
# Enable debug-level logging for etcd.
log-level: debug

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v2 v2.305.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/etcdutl/v3 v3.5.0
go.etcd.io/etcd/pkg/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/pkg/v3 v3.5.0
go.etcd.io/etcd/raft/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0
)

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v2 v2.305.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/etcdctl/v3 v3.5.0
go.etcd.io/etcd/etcdutl/v3 v3.5.0
go.etcd.io/etcd/pkg/v3 v3.5.0
go.etcd.io/etcd/raft/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.etcd.io/etcd/tests/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdctl/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0
)

View File

@ -0,0 +1,69 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpc_testing
import (
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}
type RequestInfo struct {
FullMethod string
Authority string
}
func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(toRequestInfo(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}
func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}
func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}
func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.2
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -147,10 +147,12 @@ type ServerConfig struct {
ForceNewCluster bool
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool
EnableGRPCGateway bool

View File

@ -207,7 +207,7 @@ type Config struct {
// SelfSignedCertValidity specifies the validity period of the client and peer certificates
// that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS,
// the unit is year, and the default is 1
SelfSignedCertValidity uint
SelfSignedCertValidity uint `json:"self-signed-cert-validity"`
// CipherSuites is a list of supported TLS cipher suites between
// client/server and peers. If empty, Go auto-populates the list.
@ -314,10 +314,15 @@ type Config struct {
// Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
@ -591,7 +596,9 @@ func (cfg *configYAML) configFromFile(path string) error {
copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON)
cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS
cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS
if cfg.SelfSignedCertValidity == 0 {
cfg.SelfSignedCertValidity = 1
}
return cfg.Validate()
}
@ -676,6 +683,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
}
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}
return nil
}

View File

@ -291,6 +291,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
}
}
func TestLeaseCheckpointValidate(t *testing.T) {
tcs := []struct {
name string
configFunc func() Config
expectError bool
}{
{
name: "Default config should pass",
configFunc: func() Config {
return *NewConfig()
},
},
{
name: "Enabling checkpoint leases should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
return cfg
},
},
{
name: "Enabling checkpoint leases and persist should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
},
{
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
expectError: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cfg := tc.configFunc()
err := cfg.Validate()
if (err != nil) != tc.expectError {
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
}
})
}
}
func TestLogRotation(t *testing.T) {
tests := []struct {
name string

View File

@ -216,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
@ -539,7 +540,7 @@ func (e *Etcd) servePeers() (err error) {
for _, p := range e.Peers {
u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg)
gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{

View File

@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}()
if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, gopts...)
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {

View File

@ -280,7 +280,9 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")

View File

@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap"
)
@ -137,8 +138,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
for _, v := range as {
alarmName := v.Alarm.String()
if _, found := excludedAlarms[alarmName]; found {
lg.Debug("/health excluded alarm", zap.String("alarm", alarmName))
delete(excludedAlarms, alarmName)
lg.Debug("/health excluded alarm", zap.String("alarm", v.String()))
continue
}
@ -156,10 +156,6 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
}
}
if len(excludedAlarms) > 0 {
lg.Warn("fail exclude alarms from health check", zap.String("exclude alarms", fmt.Sprintf("%+v", excludedAlarms)))
}
if uint64(srv.Leader()) == raft.None {
h.Health = "false"
h.Reason = "RAFT NO LEADER"
@ -193,7 +189,7 @@ func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms Al
ctx, cancel := context.WithTimeout(context.Background(), srv.Cfg.ReqTimeout())
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
cancel()
if err != nil {
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
h.Health = "false"
h.Reason = fmt.Sprintf("RANGE ERROR:%s", err)
lg.Warn("serving /health false; Range fails", zap.Error(err))

View File

@ -78,6 +78,12 @@ func TestHealthHandler(t *testing.T) {
http.StatusOK,
"true",
},
{
[]*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
"/health?exclude=NOSPACE",
http.StatusOK,
"true",
},
{
[]*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
"/health?exclude=NOSPACE",

View File

@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"path"
"sort"
@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
@ -254,12 +256,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} else {
if c.v2store != nil {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
} else {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
}
if c.be != nil {
@ -381,11 +383,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
c.lg.Panic(
"failed to save member to backend",
zap.String("member-id", m.ID.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member already exist.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
c.members[m.ID] = m
@ -404,11 +432,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
beErr = unsafeDeleteMemberFromBackend(c.be, id)
if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
c.lg.Panic(
"failed to delete member from backend",
zap.String("member-id", id.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member not found.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
m, ok := c.members[id]
@ -443,7 +496,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
return
}
@ -476,7 +529,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@ -495,7 +548,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@ -870,7 +923,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {

View File

@ -20,8 +20,12 @@ import (
"path"
"reflect"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/client/pkg/v3/testutil"
@ -29,8 +33,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mock/mockstore"
"go.uber.org/zap"
)
func TestClusterMember(t *testing.T) {
@ -1019,3 +1021,193 @@ func TestIsVersionChangable(t *testing.T) {
})
}
}
func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectPanics bool
expectMembers map[types.ID]*Member
}{
{
name: "Adding new member should succeed",
},
{
name: "Adding member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Adding member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Adding member should fail if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
expectPanics: true,
},
{
name: "Adding member should fail if it exists in storeV2 and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
expectPanics: true,
},
{
name: "Adding member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
{
name: "Adding new member should succeed if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
},
{
name: "Adding new member should fail if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.AddMember(alice, ApplyBoth)
})
} else {
cluster.AddMember(alice, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers)
}
})
}
}
func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectMembers []*Member
expectPanics bool
}{
{
name: "Removing new member should fail",
expectPanics: true,
},
{
name: "Removing member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Removing member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Removing member should succeed if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
},
{
name: "Removing new member should fail if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
expectPanics: true,
},
{
name: "Removing new member should succeed if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
{
name: "Removing member should succeed if it exists in v2storage and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
},
{
name: "Removing member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.RemoveMember(alice.ID, ApplyBoth)
})
} else {
cluster.RemoveMember(alice.ID, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{}, beMembers)
}
})
}
}

View File

@ -15,6 +15,7 @@
package membership
import (
"bytes"
"encoding/json"
"fmt"
"path"
@ -39,9 +40,11 @@ const (
var (
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
errMemberAlreadyExist = fmt.Errorf("member already exists")
errMemberNotFound = fmt.Errorf("member not found")
)
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
mkey := backendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
}
tx.UnsafePut(buckets.Members, mkey, mvalue)
return nil
}
// TrimClusterFromBackend removes all information about cluster (versions)
@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error {
return nil
}
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
return errMemberNotFound
}
tx.UnsafeDelete(buckets.Members, mkey)
return nil
}
func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
var found bool
tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
if bytes.Equal(k, mkey) {
found = true
}
return nil
})
return found
}
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
err := unsafeSaveMemberToStore(lg, s, m)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save member to store",
zap.String("path", p),
zap.String("member-id", m.ID.String()),
zap.Error(err),
)
}
}
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
_, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
return err
}
func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic(
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
)
return err
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
)
return err
}
return nil
}
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {

View File

@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32
)
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
}
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}
chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s),

View File

@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
// In sync with status.FromContextError
context.Canceled: rpctypes.ErrGRPCCanceled,

View File

@ -514,6 +514,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
// A snapshot db may have already been recovered, and the old db should have
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)
s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info(
"recovered v3 backend from snapshot",
@ -592,9 +595,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

View File

@ -25,12 +25,12 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v2 v2.305.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/pkg/v3 v3.5.0
go.etcd.io/etcd/raft/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0

View File

@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
var v3_6 = semver.Version{Major: 3, Minor: 6}
var (
forever = time.Time{}
@ -180,19 +183,29 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}
type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}
type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
}
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cluster, cfg)
}
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
cluster: cluster,
}
l.initAndRecover()
@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil
}
func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}
func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}
if len(le.leaseMap) < leaseRevokeRate {
@ -789,9 +817,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
}
}
le.leaseExpiredNotifier.Init()

View File

@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()

View File

@ -26,7 +26,9 @@ import (
"testing"
"time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
@ -46,7 +48,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
@ -210,7 +212,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -243,7 +245,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@ -292,7 +294,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -311,7 +313,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@ -341,7 +343,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -382,7 +384,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
@ -391,7 +393,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
@ -412,7 +414,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -465,7 +467,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -514,7 +516,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1)
@ -530,7 +532,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@ -543,13 +546,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
})
defer le.Stop()
le.Promote(0)
_, err := le.Grant(1, 2)
if err != nil {
t.Fatal(err)
}
le.Promote(0)
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select {
@ -565,7 +566,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
@ -579,6 +580,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
}
}
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
const ttl int64 = 10
const checkpointTTL int64 = 5
tcs := []struct {
name string
cluster cluster
checkpointPersist bool
expectRemainingTTL int64
}{
{
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
cluster: clusterV3_6(),
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
cluster: clusterLatest(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
cluster: clusterNil(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
cluster: clusterLatest(),
expectRemainingTTL: ttl,
},
{
name: "Etcd with version unknown fallbacks to v3.5 behavior",
cluster: clusterNil(),
expectRemainingTTL: ttl,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
cfg.CheckpointPersist = tc.checkpointPersist
le := newLessor(lg, be, tc.cluster, cfg)
l, err := le.Grant(2, ttl)
if err != nil {
t.Fatal(err)
}
if l.RemainingTTL() != ttl {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
}
le.Checkpoint(2, checkpointTTL)
if l.RemainingTTL() != checkpointTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
}
le.Stop()
le2 := newLessor(lg, be, clusterV3_6(), cfg)
l = le2.Lookup(2)
if l.RemainingTTL() != tc.expectRemainingTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
}
})
}
}
type fakeDeleter struct {
deleted []string
tx backend.BatchTx
@ -606,3 +676,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
func clusterV3_6() cluster {
return fakeCluster{semver.New("3.6.0")}
}
func clusterLatest() cluster {
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
}
func clusterNil() cluster {
return fakeCluster{}
}
type fakeCluster struct {
version *semver.Version
}
func (c fakeCluster) Version() *semver.Version {
return c.version
}

View File

@ -432,6 +432,8 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error {
now := time.Now()
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then

View File

@ -83,6 +83,13 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "defrag_inflight",
Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
})
)
func init() {
@ -92,4 +99,5 @@ func init() {
prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(isDefragActive)
}

View File

@ -355,8 +355,11 @@ func (s *watchableStore) syncWatchers() int {
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
var victims watcherBatch
wb := newWatcherBatch(wg, evs)

View File

@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig)
}
func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}
type proxyProc struct {
lg *zap.Logger
execPath string
@ -132,7 +136,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...))
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil)
if err != nil {
return err
}

View File

@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string
clusterSize int
@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,

View File

@ -505,7 +505,7 @@ func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir st
cmdArgs = append(cmdArgs, "--with-v3=false")
}
t.Logf("Running: %v", cmdArgs)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}

View File

@ -101,5 +101,5 @@ func alarmTest(cx ctlCtx) {
func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error {
cmdArgs := append(cx.PrefixArgs(), "alarm", cmd)
return spawnWithExpects(cmdArgs, as...)
return spawnWithExpects(cmdArgs, cx.envMap, as...)
}

View File

@ -93,7 +93,7 @@ func authEnable(cx ctlCtx) error {
func ctlV3AuthEnable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
return spawnWithExpect(cmdArgs, "Authentication Enabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
}
func authDisableTest(cx ctlCtx) {
@ -139,12 +139,12 @@ func authDisableTest(cx ctlCtx) {
func ctlV3AuthDisable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "disable")
return spawnWithExpect(cmdArgs, "Authentication Disabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Disabled")
}
func authStatusTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: false", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: false", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}
@ -155,15 +155,15 @@ func authStatusTest(cx ctlCtx) {
cx.user, cx.pass = "root", "root"
cmdArgs = append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: true", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: true", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}
cmdArgs = append(cx.PrefixArgs(), "auth", "status", "--write-out", "json")
if err := spawnWithExpect(cmdArgs, "enabled"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "enabled"); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(cmdArgs, "authRevision"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "authRevision"); err != nil {
cx.t.Fatal(err)
}
}
@ -381,25 +381,25 @@ func authRoleRevokeDuringOpsTest(cx ctlCtx) {
}
func ctlV3PutFailAuth(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "authentication failed")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "authentication failed")
}
func ctlV3PutFailPerm(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "permission denied")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "permission denied")
}
func authSetupTestUser(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "test-user", "test-role"}, "Role test-role is granted to user test-user", nil); err != nil {
cx.t.Fatal(err)
}
cmd := append(cx.PrefixArgs(), "role", "grant-permission", "test-role", "readwrite", "foo")
if err := spawnWithExpect(cmd, "Role test-role updated"); err != nil {
if err := spawnWithExpectWithEnv(cmd, cx.envMap, "Role test-role updated"); err != nil {
cx.t.Fatal(err)
}
}
@ -611,7 +611,7 @@ func authTestCertCN(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "example.com", "--interactive=false"}, "User example.com created", []string{""}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role"}, "Role test-role is granted to user example.com", nil); err != nil {
@ -921,13 +921,13 @@ func authTestRoleGet(cx ctlCtx) {
"KV Read:", "foo",
"KV Write:", "foo",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
// test-user can get the information of test-role because it belongs to the role
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
@ -935,7 +935,7 @@ func authTestRoleGet(cx ctlCtx) {
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
@ -952,13 +952,13 @@ func authTestUserGet(cx ctlCtx) {
"Roles: test-role",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
// test-user can get the information of test-user itself
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
@ -966,7 +966,7 @@ func authTestUserGet(cx ctlCtx) {
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
@ -977,7 +977,7 @@ func authTestRoleList(cx ctlCtx) {
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "list"), "test-role"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "list"), cx.envMap, "test-role"); err != nil {
cx.t.Fatal(err)
}
}
@ -1088,7 +1088,7 @@ func certCNAndUsername(cx ctlCtx, noPassword bool) {
cx.t.Fatal(err)
}
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), "Role test-role-cn created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), cx.envMap, "Role test-role-cn created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role-cn"}, "Role test-role-cn is granted to user example.com", nil); err != nil {

View File

@ -71,5 +71,5 @@ func ctlV3Compact(cx ctlCtx, rev int64, physical bool) error {
if physical {
cmdArgs = append(cmdArgs, "--physical")
}
return spawnWithExpect(cmdArgs, "compacted revision "+rs)
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "compacted revision "+rs)
}

View File

@ -52,13 +52,13 @@ func ctlV3OnlineDefrag(cx ctlCtx) error {
for i := range lines {
lines[i] = "Finished defragmenting etcd member"
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}
func ctlV3OfflineDefrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgsUtl(), "defrag", "--data-dir", cx.dataDir)
lines := []string{"finished defragmenting directory"}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}
func defragOfflineTest(cx ctlCtx) {

View File

@ -98,7 +98,7 @@ func testElect(cx ctlCtx) {
// ctlV3Elect creates a elect process with a channel listening for when it wins the election.
func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1)
if err != nil {
close(outc)

View File

@ -40,7 +40,7 @@ func ctlV3EndpointHealth(cx ctlCtx) error {
for i := range lines {
lines[i] = "is healthy"
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}
func endpointStatusTest(cx ctlCtx) {
@ -56,7 +56,7 @@ func ctlV3EndpointStatus(cx ctlCtx) error {
u, _ := url.Parse(ep)
eps = append(eps, u.Host)
}
return spawnWithExpects(cmdArgs, eps...)
return spawnWithExpects(cmdArgs, cx.envMap, eps...)
}
func endpointHashKVTest(cx ctlCtx) {
@ -88,5 +88,5 @@ func ctlV3EndpointHashKV(cx ctlCtx) error {
u, _ := url.Parse(ep)
ss = append(ss, fmt.Sprintf("%s, %d", u.Host, hresp.Hash))
}
return spawnWithExpects(cmdArgs, ss...)
return spawnWithExpects(cmdArgs, cx.envMap, ss...)
}

View File

@ -0,0 +1,213 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !cluster_proxy
// +build !cluster_proxy
package e2e
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/testutil"
)
func TestAuthority(t *testing.T) {
tcs := []struct {
name string
useTLS bool
useInsecureTLS bool
// Pattern used to generate endpoints for client. Fields filled
// %d - will be filled with member grpc port
clientURLPattern string
// Pattern used to validate authority received by server. Fields filled:
// %d - will be filled with first member grpc port
expectAuthorityPattern string
}{
{
name: "http://domain[:port]",
clientURLPattern: "http://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "http://address[:port]",
clientURLPattern: "http://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port]",
useTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port]",
useTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
}
for _, tc := range tcs {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
BeforeTest(t)
cfg := newConfigNoTLS()
cfg.clusterSize = clusterSize
if tc.useTLS {
cfg.clientTLS = clientTLS
}
cfg.isClientAutoTLS = tc.useInsecureTLS
// Enable debug mode to get logs with http2 headers (including authority)
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}
epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer epc.Close()
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)
client := clusterEtcdctlV3(cfg, endpoints)
err = client.Put("foo", "bar")
if err != nil {
t.Fatal(err)
}
executeWithTimeout(t, 5*time.Second, func() {
assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc)
})
})
}
}
}
func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string {
t.Helper()
endpoints := []string{}
for i := 0; i < clus.cfg.clusterSize; i++ {
ent := pattern
if strings.Contains(ent, "%d") {
ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5)
}
if strings.Contains(ent, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
}
endpoints = append(endpoints, ent)
}
return endpoints
}
func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) {
logs := []logsExpect{}
for _, proc := range clus.procs {
logs = append(logs, proc.Logs())
}
line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...)
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority)
assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine))
}
func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string {
t.Helper()
match := make(chan string, len(logs))
for i := range logs {
go func(l logsExpect) {
line, _ := l.Expect(expectLine)
match <- line
}(logs[i])
}
return <-match
}
func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}
type etcdctlV3 struct {
cfg *etcdProcessClusterConfig
endpoints []string
}
func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 {
return &etcdctlV3{
cfg: cfg,
endpoints: endpoints,
}
}
func (ctl *etcdctlV3) Put(key, value string) error {
return ctl.runCmd("put", key, value)
}
func (ctl *etcdctlV3) runCmd(args ...string) error {
cmdArgs := []string{ctlBinPath + "3"}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
cmdArgs = append(cmdArgs, args...)
return spawnWithExpect(cmdArgs, "OK")
}
func (ctl *etcdctlV3) flags() map[string]string {
fmap := make(map[string]string)
if ctl.cfg.clientTLS == clientTLS {
if ctl.cfg.isClientAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
} else if ctl.cfg.isClientCRL {
fmap["cacert"] = caPath
fmap["cert"] = revokedCertPath
fmap["key"] = revokedPrivateKeyPath
} else {
fmap["cacert"] = caPath
fmap["cert"] = certPath
fmap["key"] = privateKeyPath
}
}
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
return fmap
}

View File

@ -190,7 +190,7 @@ func getFormatTest(cx ctlCtx) {
cmdArgs = append(cmdArgs, "--print-value-only")
}
cmdArgs = append(cmdArgs, "abc")
if err := spawnWithExpect(cmdArgs, tt.wstr); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, tt.wstr); err != nil {
cx.t.Errorf("#%d: error (%v), wanted %v", i, err, tt.wstr)
}
}
@ -228,24 +228,24 @@ func getKeysOnlyTest(cx ctlCtx) {
cx.t.Fatal(err)
}
cmdArgs := append(cx.PrefixArgs(), []string{"get", "--keys-only", "key"}...)
if err := spawnWithExpect(cmdArgs, "key"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "key"); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpects(cmdArgs, "val"); err == nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "val"); err == nil {
cx.t.Fatalf("got value but passed --keys-only")
}
}
func getCountOnlyTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 0"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 0"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3Put(cx, "key", "val", ""); err != nil {
cx.t.Fatal(err)
}
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 1"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 1"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3Put(cx, "key1", "val", ""); err != nil {
@ -255,21 +255,21 @@ func getCountOnlyTest(cx ctlCtx) {
cx.t.Fatal(err)
}
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 2"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 2"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3Put(cx, "key2", "val", ""); err != nil {
cx.t.Fatal(err)
}
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 3"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 3"); err != nil {
cx.t.Fatal(err)
}
expected := []string{
"\"Count\" : 3",
}
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key3", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, expected...); err == nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, expected...); err == nil {
cx.t.Fatal(err)
}
}
@ -348,7 +348,7 @@ func ctlV3Put(cx ctlCtx, key, value, leaseID string, flags ...string) error {
if len(flags) != 0 {
cmdArgs = append(cmdArgs, flags...)
}
return spawnWithExpect(cmdArgs, "OK")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "OK")
}
type kv struct {
@ -365,7 +365,7 @@ func ctlV3Get(cx ctlCtx, args []string, kvs ...kv) error {
for _, elem := range kvs {
lines = append(lines, elem.key, elem.val)
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}
// ctlV3GetWithErr runs "get" command expecting no output but error
@ -375,11 +375,11 @@ func ctlV3GetWithErr(cx ctlCtx, args []string, errs []string) error {
if !cx.quorum {
cmdArgs = append(cmdArgs, "--consistency", "s")
}
return spawnWithExpects(cmdArgs, errs...)
return spawnWithExpects(cmdArgs, cx.envMap, errs...)
}
func ctlV3Del(cx ctlCtx, args []string, num int) error {
cmdArgs := append(cx.PrefixArgs(), "del")
cmdArgs = append(cmdArgs, args...)
return spawnWithExpects(cmdArgs, fmt.Sprintf("%d", num))
return spawnWithExpects(cmdArgs, cx.envMap, fmt.Sprintf("%d", num))
}

View File

@ -113,7 +113,7 @@ func leaseTestGrantTimeToLive(cx ctlCtx) {
}
cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", id, "--keys")
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
cx.t.Fatalf("leaseTestGrantTimeToLive: error (%v)", err)
}
@ -146,7 +146,7 @@ func leaseTestGrantLeasesList(cx ctlCtx) error {
}
cmdArgs := append(cx.PrefixArgs(), "lease", "list")
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return fmt.Errorf("lease list failed (%v)", err)
}
@ -177,7 +177,7 @@ func leaseTestTimeToLiveExpire(cx ctlCtx, ttl int) error {
time.Sleep(time.Duration(ttl+1) * time.Second)
cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", leaseID)
exp := fmt.Sprintf("lease %s already expired", leaseID)
if err = spawnWithExpect(cmdArgs, exp); err != nil {
if err = spawnWithExpectWithEnv(cmdArgs, cx.envMap, exp); err != nil {
return fmt.Errorf("lease not properly expired: (%v)", err)
}
if err := ctlV3Get(cx, []string{"key"}); err != nil {
@ -247,7 +247,7 @@ func leaseTestRevoke(cx ctlCtx) error {
func ctlV3LeaseGrant(cx ctlCtx, ttl int) (string, error) {
cmdArgs := append(cx.PrefixArgs(), "lease", "grant", strconv.Itoa(ttl))
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return "", err
}
@ -271,7 +271,7 @@ func ctlV3LeaseGrant(cx ctlCtx, ttl int) (string, error) {
func ctlV3LeaseKeepAlive(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", leaseID)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}
@ -285,7 +285,7 @@ func ctlV3LeaseKeepAlive(cx ctlCtx, leaseID string) error {
func ctlV3LeaseKeepAliveOnce(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", "--once", leaseID)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}
@ -298,5 +298,5 @@ func ctlV3LeaseKeepAliveOnce(cx ctlCtx, leaseID string) error {
func ctlV3LeaseRevoke(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "revoke", leaseID)
return spawnWithExpect(cmdArgs, fmt.Sprintf("lease %s revoked", leaseID))
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("lease %s revoked", leaseID))
}

View File

@ -119,7 +119,7 @@ func testLockWithCmd(cx ctlCtx) {
// ctlV3Lock creates a lock process with a channel listening for when it acquires the lock.
func ctlV3Lock(cx ctlCtx, name string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "lock", name)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1)
if err != nil {
close(outc)
@ -140,5 +140,5 @@ func ctlV3LockWithCmd(cx ctlCtx, execCmd []string, as ...string) error {
// use command as lock name
cmdArgs := append(cx.PrefixArgs(), "lock", execCmd[0])
cmdArgs = append(cmdArgs, execCmd...)
return spawnWithExpects(cmdArgs, as...)
return spawnWithExpects(cmdArgs, cx.envMap, as...)
}

View File

@ -83,7 +83,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
cmdArgs := append(cx.PrefixArgs(), "make-mirror")
cmdArgs = append(cmdArgs, flags...)
cmdArgs = append(cmdArgs, fmt.Sprintf("localhost:%d", mirrorcfg.basePort))
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
cx.t.Fatal(err)
}

View File

@ -95,13 +95,13 @@ func ctlV3MemberList(cx ctlCtx) error {
for i := range lines {
lines[i] = "started"
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}
func getMemberList(cx ctlCtx) (etcdserverpb.MemberListResponse, error) {
cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "member", "list")
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return etcdserverpb.MemberListResponse{}, err
}
@ -130,7 +130,7 @@ func memberListWithHexTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "--hex", "member", "list")
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
cx.t.Fatalf("memberListWithHexTest error (%v)", err)
}
@ -177,7 +177,7 @@ func memberRemoveTest(cx ctlCtx) {
func ctlV3MemberRemove(cx ctlCtx, ep, memberID, clusterID string) error {
cmdArgs := append(cx.prefixArgs([]string{ep}), "member", "remove", memberID)
return spawnWithExpect(cmdArgs, fmt.Sprintf("%s removed from cluster %s", memberID, clusterID))
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("%s removed from cluster %s", memberID, clusterID))
}
func memberAddTest(cx ctlCtx) {
@ -197,7 +197,7 @@ func ctlV3MemberAdd(cx ctlCtx, peerURL string, isLearner bool) error {
if isLearner {
cmdArgs = append(cmdArgs, "--learner")
}
return spawnWithExpect(cmdArgs, " added to cluster ")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, " added to cluster ")
}
func memberUpdateTest(cx ctlCtx) {
@ -215,5 +215,5 @@ func memberUpdateTest(cx ctlCtx) {
func ctlV3MemberUpdate(cx ctlCtx, memberID, peerURL string) error {
cmdArgs := append(cx.PrefixArgs(), "member", "update", memberID, fmt.Sprintf("--peer-urls=%s", peerURL))
return spawnWithExpect(cmdArgs, " updated in cluster ")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, " updated in cluster ")
}

View File

@ -97,21 +97,22 @@ func testCtlV3MoveLeader(t *testing.T, cfg etcdProcessClusterConfig) {
}
tests := []struct {
prefixes []string
expect string
eps []string
expect string
}{
{ // request to non-leader
cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}),
[]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]},
"no leader endpoint given at ",
},
{ // request to leader
cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}),
[]string{cx.epc.EndpointsV3()[leadIdx]},
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
},
}
for i, tc := range tests {
cmdArgs := append(tc.prefixes, "move-leader", types.ID(transferee).String())
if err := spawnWithExpect(cmdArgs, tc.expect); err != nil {
prefix := cx.prefixArgs(tc.eps)
cmdArgs := append(prefix, "move-leader", types.ID(transferee).String())
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, tc.expect); err != nil {
t.Fatalf("#%d: %v", i, err)
}
}

View File

@ -96,7 +96,7 @@ func ctlV3Role(cx ctlCtx, args []string, expStr string) error {
cmdArgs := append(cx.PrefixArgs(), "role")
cmdArgs = append(cmdArgs, args...)
return spawnWithExpect(cmdArgs, expStr)
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, expStr)
}
func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) error {
@ -110,7 +110,7 @@ func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) err
cmdArgs = append(cmdArgs, rolename)
cmdArgs = append(cmdArgs, grantingPermToArgs(perm)...)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return err
}
@ -136,7 +136,7 @@ func ctlV3RoleRevokePermission(cx ctlCtx, rolename string, key, rangeEnd string,
expStr = fmt.Sprintf("Permission of key %s is revoked from role %s", key, rolename)
}
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return err
}

View File

@ -84,10 +84,11 @@ func snapshotCorruptTest(cx ctlCtx) {
datadir := cx.t.TempDir()
serr := spawnWithExpect(
serr := spawnWithExpectWithEnv(
append(cx.PrefixArgsUtl(), "snapshot", "restore",
"--data-dir", datadir,
fpath),
cx.envMap,
"expected sha256")
if serr != nil {
@ -117,10 +118,11 @@ func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
dataDir := cx.t.TempDir()
defer os.RemoveAll(dataDir)
serr := spawnWithExpect(
serr := spawnWithExpectWithEnv(
append(cx.PrefixArgsUtl(), "snapshot", "restore",
"--data-dir", dataDir,
fpath),
cx.envMap,
"added member")
if serr != nil {
cx.t.Fatal(serr)
@ -129,13 +131,13 @@ func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
func ctlV3SnapshotSave(cx ctlCtx, fpath string) error {
cmdArgs := append(cx.PrefixArgs(), "snapshot", "save", fpath)
return spawnWithExpect(cmdArgs, fmt.Sprintf("Snapshot saved at %s", fpath))
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("Snapshot saved at %s", fpath))
}
func getSnapshotStatus(cx ctlCtx, fpath string) (snapshot.Status, error) {
cmdArgs := append(cx.PrefixArgsUtl(), "--write-out", "json", "snapshot", "status", fpath)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return snapshot.Status{}, err
}
@ -202,7 +204,10 @@ func testIssue6361(t *testing.T, etcdutl bool) {
fpath := filepath.Join(t.TempDir(), "test.snapshot")
t.Log("etcdctl saving snapshot...")
if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil {
if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
nil,
fmt.Sprintf("Snapshot saved at %s", fpath),
); err != nil {
t.Fatal(err)
}
@ -262,7 +267,7 @@ func testIssue6361(t *testing.T, etcdutl bool) {
nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2})
"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -104,7 +104,7 @@ func clusterVersionTest(cx ctlCtx, expected string) {
func ctlV3Version(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "version")
return spawnWithExpect(cmdArgs, version.Version)
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, version.Version)
}
// TestCtlV3DialWithHTTPScheme ensures that client handles endpoints with HTTPS scheme.
@ -114,7 +114,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) {
func dialWithSchemeTest(cx ctlCtx) {
cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar")
if err := spawnWithExpect(cmdArgs, "OK"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "OK"); err != nil {
cx.t.Fatal(err)
}
}
@ -129,7 +129,7 @@ type ctlCtx struct {
epc *etcdProcessCluster
envMap map[string]struct{}
envMap map[string]string
dialTimeout time.Duration
@ -201,7 +201,7 @@ func withApiPrefix(p string) ctlOption {
}
func withFlagByEnv() ctlOption {
return func(cx *ctlCtx) { cx.envMap = make(map[string]struct{}) }
return func(cx *ctlCtx) { cx.envMap = make(map[string]string) }
}
func withEtcdutl() ctlOption {
@ -248,6 +248,7 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
for k := range ret.envMap {
os.Unsetenv(k)
}
ret.envMap = make(map[string]string)
}
if ret.epc != nil {
if errC := ret.epc.Close(); errC != nil {
@ -311,8 +312,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
for k, v := range fmap {
if useEnv {
ek := flags.FlagToEnv("ETCDCTL", k)
os.Setenv(ek, v)
cx.envMap[ek] = struct{}{}
cx.envMap[ek] = v
} else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}

View File

@ -102,7 +102,7 @@ func ctlV3Txn(cx ctlCtx, rqs txnRequests) error {
if cx.interactive {
cmdArgs = append(cmdArgs, "--interactive")
}
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return err
}

View File

@ -179,7 +179,7 @@ func ctlV3User(cx ctlCtx, args []string, expStr string, stdIn []string) error {
cmdArgs := append(cx.PrefixArgs(), "user")
cmdArgs = append(cmdArgs, args...)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil {
return err
}

View File

@ -35,7 +35,7 @@ func setupWatchArgs(cx ctlCtx, args []string) []string {
func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error {
cmdArgs := setupWatchArgs(cx, args)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}
@ -66,7 +66,7 @@ func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error {
func ctlV3WatchFailPerm(cx ctlCtx, args []string) error {
cmdArgs := setupWatchArgs(cx, args)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}

View File

@ -29,7 +29,7 @@ const exampleConfigFile = "../../etcd.conf.yml.sample"
func TestEtcdExampleConfig(t *testing.T) {
skipInShortMode(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--config-file", exampleConfigFile})
proc, err := spawnCmd([]string{binDir + "/etcd", "--config-file", exampleConfigFile}, nil)
if err != nil {
t.Fatal(err)
}
@ -75,7 +75,7 @@ func TestEtcdMultiPeer(t *testing.T) {
"--initial-advertise-peer-urls", fmt.Sprintf("http://127.0.0.1:%d", etcdProcessBasePort+i),
"--initial-cluster", ic,
}
p, err := spawnCmd(args)
p, err := spawnCmd(args, nil)
if err != nil {
t.Fatal(err)
}
@ -106,7 +106,7 @@ func TestEtcdUnixPeers(t *testing.T) {
"--listen-peer-urls", "unix://etcd.unix:1",
"--initial-advertise-peer-urls", "unix://etcd.unix:1",
"--initial-cluster", "e1=unix://etcd.unix:1",
},
}, nil,
)
defer os.Remove("etcd.unix:1")
if err != nil {
@ -183,7 +183,7 @@ func TestEtcdPeerCNAuth(t *testing.T) {
commonArgs = append(commonArgs, args...)
p, err := spawnCmd(commonArgs)
p, err := spawnCmd(commonArgs, nil)
if err != nil {
t.Fatal(err)
}
@ -262,7 +262,7 @@ func TestEtcdPeerNameAuth(t *testing.T) {
commonArgs = append(commonArgs, args...)
p, err := spawnCmd(commonArgs)
p, err := spawnCmd(commonArgs, nil)
if err != nil {
t.Fatal(err)
}
@ -308,7 +308,7 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
t.Errorf("Unexpected error: %s", err)
}
p, err := spawnCmd(argsWithEmptyCN)
p, err := spawnCmd(argsWithEmptyCN, nil)
defer func() {
if p != nil {
p.Stop()
@ -323,7 +323,7 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
func TestBootstrapDefragFlag(t *testing.T) {
skipInShortMode(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--experimental-bootstrap-defrag-threshold-megabytes", "1000"})
proc, err := spawnCmd([]string{binDir + "/etcd", "--experimental-bootstrap-defrag-threshold-megabytes", "1000"}, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -87,7 +87,7 @@ func corruptTest(cx ctlCtx) {
cx.t.Log("restarting etcd[0]")
ep := cx.epc.procs[0]
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...), cx.envMap)
if err != nil {
cx.t.Fatal(err)
}

View File

@ -43,6 +43,11 @@ type etcdProcess interface {
Close() error
WithStopSignal(sig os.Signal) os.Signal
Config() *etcdServerProcessConfig
Logs() logsExpect
}
type logsExpect interface {
Expect(string) (string, error)
}
type etcdServerProcess struct {
@ -56,6 +61,7 @@ type etcdServerProcessConfig struct {
execPath string
args []string
tlsArgs []string
envVars map[string]string
dataDirPath string
keepDataDir bool
@ -92,7 +98,7 @@ func (ep *etcdServerProcess) Start() error {
panic("already started")
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name))
proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...))
proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
if err != nil {
return err
}
@ -163,3 +169,10 @@ func (ep *etcdServerProcess) waitReady() error {
}
func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }
func (ep *etcdServerProcess) Logs() logsExpect {
if ep.proc == nil {
ep.cfg.lg.Panic("Please grap logs before process is stopped")
}
return ep.proc
}

View File

@ -18,6 +18,7 @@
package e2e
import (
"fmt"
"os"
"strings"
@ -27,20 +28,41 @@ import (
const noOutputLineCount = 0 // regular binaries emit no extra lines
func spawnCmd(args []string) (*expect.ExpectProcess, error) {
return spawnCmdWithLogger(zap.NewNop(), args)
func spawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
return spawnCmdWithLogger(zap.NewNop(), args, envVars)
}
func spawnCmdWithLogger(lg *zap.Logger, args []string) (*expect.ExpectProcess, error) {
func spawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
wd, err := os.Getwd()
if err != nil {
return nil, err
}
env := mergeEnvVariables(envVars)
if strings.HasSuffix(args[0], "/etcdctl3") {
env := append(os.Environ(), "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd))
env = append(env, "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(ctlBinPath, args[1:], env)
}
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd))
return expect.NewExpect(args[0], args[1:]...)
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(args[0], args[1:], env)
}
func mergeEnvVariables(envVars map[string]string) []string {
var env []string
// Environment variables are passed as parameter have higher priority
// than os environment variables.
for k, v := range envVars {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
// Now, we can set os environment variables not passed as parameter.
currVars := os.Environ()
for _, v := range currVars {
p := strings.Split(v, "=")
if _, ok := envVars[p[0]]; !ok {
env = append(env, fmt.Sprintf("%s=%s", p[0], p[1]))
}
}
return env
}

View File

@ -40,16 +40,20 @@ func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error
}
func spawnWithExpect(args []string, expected string) error {
return spawnWithExpects(args, []string{expected}...)
return spawnWithExpects(args, nil, []string{expected}...)
}
func spawnWithExpects(args []string, xs ...string) error {
_, err := spawnWithExpectLines(args, xs...)
func spawnWithExpectWithEnv(args []string, envVars map[string]string, expected string) error {
return spawnWithExpects(args, envVars, []string{expected}...)
}
func spawnWithExpects(args []string, envVars map[string]string, xs ...string) error {
_, err := spawnWithExpectLines(args, envVars, xs...)
return err
}
func spawnWithExpectLines(args []string, xs ...string) ([]string, error) {
proc, err := spawnCmd(args)
func spawnWithExpectLines(args []string, envVars map[string]string, xs ...string) ([]string, error) {
proc, err := spawnCmd(args, envVars)
if err != nil {
return nil, err
}

View File

@ -63,7 +63,7 @@ func assertVerifyCanStartV2deprecationNotYet(t testing.TB, dataDirPath string) {
func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) {
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--data-dir=" + dataDirPath})
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--data-dir=" + dataDirPath}, nil)
assert.NoError(t, err)
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
@ -90,7 +90,7 @@ func TestV2Deprecation(t *testing.T) {
func TestV2DeprecationWriteOnlyNoV2Api(t *testing.T) {
BeforeTest(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--enable-v2"})
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--enable-v2"}, nil)
assert.NoError(t, err)
_, err = proc.Expect("--enable-v2 and --v2-deprecation=write-only are mutually exclusive")

View File

@ -244,7 +244,7 @@ func testV3CurlAuth(cx ctlCtx) {
)
cmdArgs = cURLPrefixArgs(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)})
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
testutil.AssertNil(cx.t, err)
defer proc.Close()
@ -286,7 +286,7 @@ func testV3CurlCampaign(cx ctlCtx) {
endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
value: string(cdata),
})
lines, err := spawnWithExpectLines(cargs, `"leader":{"name":"`)
lines, err := spawnWithExpectLines(cargs, cx.envMap, `"leader":{"name":"`)
if err != nil {
cx.t.Fatalf("failed post campaign request (%s) (%v)", cx.apiPrefix, err)
}

View File

@ -28,14 +28,14 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v2 v2.305.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/etcdutl/v3 v3.5.0
go.etcd.io/etcd/pkg/v3 v3.5.0
go.etcd.io/etcd/raft/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View File

@ -15,22 +15,22 @@
package integration
import (
"fmt"
"io"
"io/ioutil"
"net"
"sync"
"go.etcd.io/etcd/client/pkg/v3/transport"
)
// bridge creates a unix socket bridge to another unix socket, making it possible
type Dialer interface {
Dial() (net.Conn, error)
}
// bridge proxies connections between listener and dialer, making it possible
// to disconnect grpc network connections without closing the logical grpc connection.
type bridge struct {
inaddr string
outaddr string
l net.Listener
conns map[*bridgeConn]struct{}
dialer Dialer
l net.Listener
conns map[*bridgeConn]struct{}
stopc chan struct{}
pausec chan struct{}
@ -40,30 +40,22 @@ type bridge struct {
mu sync.Mutex
}
func newBridge(addr string) (*bridge, error) {
func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) {
b := &bridge{
// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
inaddr: addr + "0",
outaddr: addr,
dialer: dialer,
l: listener,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
blackholec: make(chan struct{}),
}
close(b.pausec)
l, err := transport.NewUnixListener(b.inaddr)
if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
}
b.l = l
b.wg.Add(1)
go b.serveListen()
return b, nil
}
func (b *bridge) URL() string { return "unix://" + b.inaddr }
func (b *bridge) Close() {
b.l.Close()
b.mu.Lock()
@ -76,7 +68,7 @@ func (b *bridge) Close() {
b.wg.Wait()
}
func (b *bridge) Reset() {
func (b *bridge) DropConnections() {
b.mu.Lock()
defer b.mu.Unlock()
for bc := range b.conns {
@ -85,13 +77,13 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{})
}
func (b *bridge) Pause() {
func (b *bridge) PauseConnections() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}
func (b *bridge) Unpause() {
func (b *bridge) UnpauseConnections() {
b.mu.Lock()
select {
case <-b.pausec:
@ -127,7 +119,7 @@ func (b *bridge) serveListen() {
case <-pausec:
}
outc, oerr := net.Dial("unix", b.outaddr)
outc, oerr := b.dialer.Dial()
if oerr != nil {
inc.Close()
return

View File

@ -38,10 +38,11 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
UseBridge: true,
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
ccfg := clientv3.Config{
Endpoints: []string{eps[0]},
@ -76,7 +77,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// give enough time for balancer resolution
time.Sleep(5 * time.Second)
clus.Members[0].Blackhole()
clus.Members[0].Bridge().Blackhole()
if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
@ -87,12 +88,12 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
t.Error("took too long to receive watch events")
}
clus.Members[0].Unblackhole()
clus.Members[0].Bridge().Unblackhole()
// waiting for moving eps[0] out of unhealthy, so that it can be re-pined.
time.Sleep(ccfg.DialTimeout)
clus.Members[1].Blackhole()
clus.Members[1].Bridge().Blackhole()
// make sure client[0] can connect to eps[0] after remove the blackhole.
if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil {
@ -170,10 +171,11 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
UseBridge: true,
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
ccfg := clientv3.Config{
Endpoints: []string{eps[0]},
@ -194,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
cli.SetEndpoints(eps...)
// blackhole eps[0]
clus.Members[0].Blackhole()
clus.Members[0].Bridge().Blackhole()
// With round robin balancer, client will make a request to a healthy endpoint
// within a few requests.

View File

@ -57,7 +57,7 @@ func TestDialTLSExpired(t *testing.T) {
}
// expect remote errors "tls: bad certificate"
_, err = integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
Endpoints: []string{clus.Members[0].GRPCURL()},
DialTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tls,
@ -75,7 +75,7 @@ func TestDialTLSNoConfig(t *testing.T) {
defer clus.Terminate(t)
// expect "signed by unknown authority"
c, err := integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
Endpoints: []string{clus.Members[0].GRPCURL()},
DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
@ -108,7 +108,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
// get endpoint list
eps := make([]string, 3)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
eps[i] = clus.Members[i].GRPCURL()
}
toKill := rand.Intn(len(eps))
@ -149,7 +149,7 @@ func TestSwitchSetEndpoints(t *testing.T) {
defer clus.Terminate(t)
// get non partitioned members endpoints
eps := []string{clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
cli := clus.Client(0)
clus.Members[0].InjectPartition(t, clus.Members[1:]...)
@ -170,7 +170,7 @@ func TestRejectOldCluster(t *testing.T) {
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
Endpoints: []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
RejectOldCluster: true,
@ -212,7 +212,7 @@ func TestSetEndpointAndPut(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)
clus.Client(1).SetEndpoints(clus.Members[0].GRPCAddr())
clus.Client(1).SetEndpoints(clus.Members[0].GRPCURL())
_, err := clus.Client(1).Put(context.TODO(), "foo", "bar")
if err != nil && !strings.Contains(err.Error(), "closing") {
t.Fatal(err)

View File

@ -112,7 +112,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// expect pin eps[0]
ccfg := clientv3.Config{
@ -168,7 +168,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
SkipCreatingClient: true,
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)
@ -224,7 +224,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
target := clus.WaitLeader(t)
if !isolateLeader {
@ -285,7 +285,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
defer clus.Terminate(t)
leaderIndex := clus.WaitLeader(t)
// get a follower endpoint
eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()}
eps := []string{clus.Members[(leaderIndex+1)%3].GRPCURL()}
ccfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 10 * time.Second,
@ -303,7 +303,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
// add other endpoints for later endpoint switch
cli.SetEndpoints(eps...)
time.Sleep(time.Second * 2)
conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr())
conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCURL())
if err != nil {
t.Fatal(err)
}

View File

@ -35,10 +35,11 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
SkipCreatingClient: true,
UseBridge: true,
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)
@ -150,7 +151,7 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// pin eps[0]
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
@ -208,7 +209,7 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// pin eps[0]
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
@ -278,6 +279,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
cfg := &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
UseBridge: true,
}
if linearizable {
cfg.Size = 3
@ -285,9 +287,9 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
if linearizable {
eps = append(eps, clus.Members[2].GRPCAddr())
eps = append(eps, clus.Members[2].GRPCURL())
}
lead := clus.WaitLeader(t)

View File

@ -712,7 +712,7 @@ func TestKVGetRetry(t *testing.T) {
integration.BeforeTest(t)
clusterSize := 3
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize, UseBridge: true})
defer clus.Terminate(t)
// because killing leader and following election
@ -765,7 +765,7 @@ func TestKVGetRetry(t *testing.T) {
func TestKVPutFailGetRetry(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)
@ -876,7 +876,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {
@ -884,12 +884,12 @@ func TestKVPutAtMostOnce(t *testing.T) {
}
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
donec := make(chan struct{})
go func() {
defer close(donec)
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(5 * time.Millisecond)
}
}()
@ -1027,7 +1027,7 @@ func TestKVForLearner(t *testing.T) {
// 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members
// 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config,
// because the implementation of integration test has diverged from embed/etcd.go.
learnerEp := clus.Members[3].GRPCAddr()
learnerEp := clus.Members[3].GRPCURL()
cfg := clientv3.Config{
Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second,
@ -1100,7 +1100,7 @@ func TestBalancerSupportLearner(t *testing.T) {
}
// clus.Members[3] is the newly added learner member, which was appended to clus.Members
learnerEp := clus.Members[3].GRPCAddr()
learnerEp := clus.Members[3].GRPCURL()
cfg := clientv3.Config{
Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second,
@ -1119,7 +1119,7 @@ func TestBalancerSupportLearner(t *testing.T) {
t.Fatalf("expect Get request to learner to fail, got no error")
}
eps := []string{learnerEp, clus.Members[0].GRPCAddr()}
eps := []string{learnerEp, clus.Members[0].GRPCURL()}
cli.SetEndpoints(eps...)
if _, err := cli.Get(context.Background(), "foo"); err != nil {
t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err)

View File

@ -190,7 +190,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
// TODO: change this line to get a cluster client
@ -416,7 +416,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -462,7 +462,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
func TestLeaseKeepAliveInitTimeout(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -495,7 +495,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) {
func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -530,7 +530,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
func TestLeaseTimeToLive(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
c := clus.RandClient()
@ -656,7 +656,7 @@ func TestLeaseLeases(t *testing.T) {
func TestLeaseRenewLostQuorum(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -728,7 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
// transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
numReqs := 5
@ -782,7 +782,7 @@ func TestV3LeaseFailureOverlap(t *testing.T) {
func TestLeaseWithRequireLeader(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
c := clus.Client(0)

View File

@ -194,7 +194,7 @@ func TestLeasingPutInvalidateExisting(t *testing.T) {
// TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased.
func TestLeasingGetNoLeaseTTL(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -223,7 +223,7 @@ func TestLeasingGetNoLeaseTTL(t *testing.T) {
// when the etcd cluster is partitioned.
func TestLeasingGetSerializable(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -325,7 +325,7 @@ func TestLeasingRevGet(t *testing.T) {
// TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server.
func TestLeasingGetWithOpts(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -417,7 +417,7 @@ func TestLeasingConcurrentPut(t *testing.T) {
func TestLeasingDisconnectedGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -549,7 +549,7 @@ func TestLeasingOverwriteResponse(t *testing.T) {
func TestLeasingOwnerPutResponse(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -616,7 +616,7 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) {
func TestLeasingTxnOwnerGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
client := clus.Client(0)
@ -772,7 +772,7 @@ func TestLeasingTxnOwnerDelete(t *testing.T) {
func TestLeasingTxnOwnerIf(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -866,7 +866,7 @@ func TestLeasingTxnOwnerIf(t *testing.T) {
func TestLeasingTxnCancel(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1084,7 +1084,7 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) {
func TestLeasingOwnerPutError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1105,7 +1105,7 @@ func TestLeasingOwnerPutError(t *testing.T) {
func TestLeasingOwnerDeleteError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1126,7 +1126,7 @@ func TestLeasingOwnerDeleteError(t *testing.T) {
func TestLeasingNonOwnerPutError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1200,7 +1200,7 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) {
func TestLeasingDeleteRangeBounds(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/")
@ -1375,7 +1375,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) {
// disconnected when trying to submit revoke txn.
func TestLeasingReconnectOwnerRevoke(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1436,7 +1436,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
// disconnected and the watch is compacted.
func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1489,7 +1489,7 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
// not cause inconsistency between the server and the client.
func TestLeasingReconnectOwnerConsistency(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1509,11 +1509,11 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
for i := 0; i < 10; i++ {
v := fmt.Sprintf("%d", i)
donec := make(chan struct{})
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
go func() {
defer close(donec)
for i := 0; i < 20; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
}()
@ -1649,7 +1649,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
// TestLeasingReconnectTxn checks that Txn is resilient to disconnects.
func TestLeasingReconnectTxn(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1663,9 +1663,9 @@ func TestLeasingReconnectTxn(t *testing.T) {
donec := make(chan struct{})
go func() {
defer close(donec)
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
time.Sleep(10 * time.Millisecond)
@ -1685,7 +1685,7 @@ func TestLeasingReconnectTxn(t *testing.T) {
// not cause inconsistency between the server and the client.
func TestLeasingReconnectNonOwnerGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1703,11 +1703,11 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) {
n := 0
for i := 0; i < 10; i++ {
donec := make(chan struct{})
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
go func() {
defer close(donec)
for j := 0; j < 10; j++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
}()
@ -1813,7 +1813,7 @@ func TestLeasingDo(t *testing.T) {
func TestLeasingTxnOwnerPutBranch(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1907,7 +1907,7 @@ func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, the
func TestLeasingSessionExpire(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
@ -1983,7 +1983,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {
for i := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))

View File

@ -55,7 +55,7 @@ func TestMaintenanceHashKV(t *testing.T) {
if _, err := cli.Get(context.TODO(), "foo"); err != nil {
t.Fatal(err)
}
hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0)
hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCURL(), 0)
if err != nil {
t.Fatal(err)
}
@ -142,7 +142,7 @@ func TestMaintenanceSnapshotError(t *testing.T) {
func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
// take about 1-second to read snapshot
@ -206,7 +206,7 @@ func TestMaintenanceStatus(t *testing.T) {
eps := make([]string, 3)
for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCAddr()
eps[i] = clus.Members[i].GRPCURL()
}
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})

View File

@ -75,7 +75,7 @@ func TestV3ClientMetrics(t *testing.T) {
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
Endpoints: []string{clus.Members[0].GRPCURL()},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),

View File

@ -30,14 +30,14 @@ func TestDetectKvOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
clus.Members[0].GRPCURL(),
clus.Members[1].GRPCURL(),
clus.Members[2].GRPCURL(),
},
}
cli, err := integration.NewClient(t, cfg)
@ -82,7 +82,7 @@ func TestDetectKvOrderViolation(t *testing.T) {
clus.Members[1].Stop(t)
assert.NoError(t, clus.Members[2].Restart(t))
// force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr())
cli.SetEndpoints(clus.Members[2].GRPCURL())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
t.Logf("Quering m2 after restart")
@ -97,14 +97,14 @@ func TestDetectTxnOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
clus.Members[0].GRPCURL(),
clus.Members[1].GRPCURL(),
clus.Members[2].GRPCURL(),
},
}
cli, err := integration.NewClient(t, cfg)
@ -151,7 +151,7 @@ func TestDetectTxnOrderViolation(t *testing.T) {
clus.Members[1].Stop(t)
assert.NoError(t, clus.Members[2].Restart(t))
// force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr())
cli.SetEndpoints(clus.Members[2].GRPCURL())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != errOrderViolation {

View File

@ -29,11 +29,11 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
eps := []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
clus.Members[0].GRPCURL(),
clus.Members[1].GRPCURL(),
clus.Members[2].GRPCURL(),
}
cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}}
cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}
cli, err := integration.NewClient(t, cfg)
if err != nil {
t.Fatal(err)
@ -71,7 +71,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
}
t.Logf("Reconfigure client to speak only to the 'partitioned' member")
cli.SetEndpoints(clus.Members[2].GRPCAddr())
cli.SetEndpoints(clus.Members[2].GRPCURL())
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != ordering.ErrNoGreaterRev {
t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error")
@ -80,15 +80,15 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
func TestUnresolvableOrderViolation(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
clus.Members[3].GRPCAddr(),
clus.Members[4].GRPCAddr(),
clus.Members[0].GRPCURL(),
clus.Members[1].GRPCURL(),
clus.Members[2].GRPCURL(),
clus.Members[3].GRPCURL(),
clus.Members[4].GRPCURL(),
},
}
cli, err := integration.NewClient(t, cfg)
@ -99,7 +99,7 @@ func TestUnresolvableOrderViolation(t *testing.T) {
eps := cli.Endpoints()
ctx := context.TODO()
cli.SetEndpoints(clus.Members[0].GRPCAddr())
cli.SetEndpoints(clus.Members[0].GRPCURL())
time.Sleep(1 * time.Second)
_, err = cli.Put(ctx, "foo", "bar")
if err != nil {
@ -139,7 +139,7 @@ func TestUnresolvableOrderViolation(t *testing.T) {
t.Fatal(err)
}
clus.Members[3].WaitStarted(t)
cli.SetEndpoints(clus.Members[3].GRPCAddr())
cli.SetEndpoints(clus.Members[3].GRPCURL())
_, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != ordering.ErrNoGreaterRev {

View File

@ -53,7 +53,7 @@ func TestTxnError(t *testing.T) {
func TestTxnWriteFail(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)
@ -103,7 +103,7 @@ func TestTxnReadRetry(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)

View File

@ -47,7 +47,7 @@ type watchctx struct {
func runWatchTest(t *testing.T, f watcherTest) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
wclientMember := rand.Intn(3)
@ -188,7 +188,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
defer close(donec)
// take down watcher connection
for {
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
select {
case <-timer:
// spinning on close may live lock reconnection
@ -230,7 +230,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover
putAndWatch(t, wctx, "a", "a")
}
@ -247,7 +247,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
}
putAndWatch(t, wctx, "a", "a")
// take down watcher connection
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover
putAndWatch(t, wctx, "a", "b")
}
@ -348,7 +348,7 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
func TestWatchResumeInitRev(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -368,8 +368,8 @@ func TestWatchResumeInitRev(t *testing.T) {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
}
// pause wch
clus.Members[0].DropConnections()
clus.Members[0].PauseConnections()
clus.Members[0].Bridge().DropConnections()
clus.Members[0].Bridge().PauseConnections()
select {
case resp, ok := <-wch:
@ -378,7 +378,7 @@ func TestWatchResumeInitRev(t *testing.T) {
}
// resume wch
clus.Members[0].UnpauseConnections()
clus.Members[0].Bridge().UnpauseConnections()
select {
case resp, ok := <-wch:
@ -404,7 +404,7 @@ func TestWatchResumeInitRev(t *testing.T) {
func TestWatchResumeCompacted(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
// create a waiting watcher at rev 1
@ -955,7 +955,7 @@ func TestWatchWithCreatedNotification(t *testing.T) {
func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
integration.BeforeTest(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer cluster.Terminate(t)
client := cluster.RandClient()
@ -968,7 +968,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
t.Fatalf("expected created event, got %v", resp)
}
cluster.Members[0].DropConnections()
cluster.Members[0].Bridge().DropConnections()
// check watch channel doesn't post another watch response.
select {
@ -1056,14 +1056,14 @@ func TestWatchOverlapContextCancel(t *testing.T) {
func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
}
testWatchOverlapContextCancel(t, f)
}
func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
n := 100
@ -1154,7 +1154,7 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
// then closes the watcher interface to ensure correct clean up.
func TestWatchStressResumeClose(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -1164,7 +1164,7 @@ func TestWatchStressResumeClose(t *testing.T) {
for i := range wchs {
wchs[i] = cli.Watch(ctx, "abc")
}
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
cancel()
if err := cli.Close(); err != nil {
t.Fatal(err)

View File

@ -39,6 +39,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/embed"
@ -73,6 +74,7 @@ const (
basePort = 21000
URLScheme = "unix"
URLSchemeTLS = "unixs"
baseGRPCPort = 30000
)
var (
@ -121,6 +123,10 @@ var (
defaultTokenJWT = fmt.Sprintf("jwt,pub-key=%s,priv-key=%s,sign-method=RS256,ttl=1s",
MustAbsPath("../fixtures/server.crt"), MustAbsPath("../fixtures/server.key.insecure"))
// uniqueNumber is used to generate unique port numbers
// Should only be accessed via atomic package methods.
uniqueNumber int32
)
type ClusterConfig struct {
@ -153,9 +159,15 @@ type ClusterConfig struct {
// UseIP is true to use only IP for gRPC requests.
UseIP bool
// UseBridge adds bridge between client and grpc server. Should be used in tests that
// want to manipulate connection or require connection not breaking despite server stop/restart.
UseBridge bool
// UseTCP configures server listen on tcp socket. If disabled unix socket is used.
UseTCP bool
EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
}
@ -208,7 +220,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster {
c := &cluster{cfg: cfg}
ms := make([]*member, cfg.Size)
for i := 0; i < cfg.Size; i++ {
ms[i] = c.mustNewMember(t)
ms[i] = c.mustNewMember(t, int64(i))
}
c.Members = ms
if err := c.fillClusterForMembers(); err != nil {
@ -249,7 +261,7 @@ func (c *cluster) Launch(t testutil.TB) {
c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion()
for _, m := range c.Members {
t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCAddr())
t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCURL())
}
}
@ -295,10 +307,11 @@ func (c *cluster) HTTPMembers() []client.Member {
return ms
}
func (c *cluster) mustNewMember(t testutil.TB) *member {
func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
m := mustNewMember(t,
memberConfig{
name: c.generateMemberName(),
memberNumber: memberNumber,
authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
@ -313,7 +326,10 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
useBridge: c.cfg.UseBridge,
useTCP: c.cfg.UseTCP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
})
@ -328,7 +344,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
// addMember return PeerURLs of the added member.
func (c *cluster) addMember(t testutil.TB) types.URLs {
m := c.mustNewMember(t)
m := c.mustNewMember(t, 0)
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
@ -557,6 +573,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
type member struct {
config.ServerConfig
UniqNumber int64
MemberNumber int64
PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener
// PeerTLSInfo enables peer TLS when set
@ -572,7 +590,7 @@ type member struct {
grpcServerOpts []grpc.ServerOption
grpcServer *grpc.Server
grpcServerPeer *grpc.Server
grpcAddr string
grpcURL string
grpcBridge *bridge
// serverClient is a clientv3 that directly calls the etcdserver.
@ -582,15 +600,21 @@ type member struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
useTCP bool
isLearner bool
closed bool
grpcServerRecorder *grpc_testing.GrpcRecorder
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
func (m *member) GRPCURL() string { return m.grpcURL }
type memberConfig struct {
name string
uniqNumber int64
memberNumber int64
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
authToken string
@ -605,8 +629,11 @@ type memberConfig struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
useTCP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
}
@ -614,7 +641,10 @@ type memberConfig struct {
// set, it will use https scheme to communicate between peers.
func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
var err error
m := &member{}
m := &member{
MemberNumber: mcfg.memberNumber,
UniqNumber: atomic.AddInt64(&localListenCount, 1),
}
peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
@ -698,8 +728,11 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.useIP = mcfg.useIP
m.useBridge = mcfg.useBridge
m.useTCP = mcfg.useTCP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
@ -707,7 +740,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.grpcServerRecorder = &grpc_testing.GrpcRecorder{}
m.Logger = memberLogger(t, mcfg.name)
t.Cleanup(func() {
// if we didn't cleanup the logger, the consecutive test
@ -730,45 +763,109 @@ func memberLogger(t testutil.TB, name string) *zap.Logger {
// listenGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
// prefix with localhost so cert has right domain
m.grpcAddr = "localhost:" + m.Name
m.Logger.Info("LISTEN GRPC", zap.String("m.grpcAddr", m.grpcAddr), zap.String("m.Name", m.Name))
if m.useIP { // for IP-only TLS certs
m.grpcAddr = "127.0.0.1:" + m.Name
}
l, err := transport.NewUnixListener(m.grpcAddr)
network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name))
grpcListener, err := net.Listen(network, grpcAddr)
if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
}
m.grpcBridge, err = newBridge(m.grpcAddr)
if err != nil {
l.Close()
return err
m.grpcURL = fmt.Sprintf("%s://%s", m.clientScheme(), grpcAddr)
if m.useBridge {
_, err = m.addBridge()
if err != nil {
grpcListener.Close()
return err
}
}
m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
m.grpcListener = l
m.grpcListener = grpcListener
return nil
}
func (m *member) clientScheme() string {
switch {
case m.useTCP && m.ClientTLSInfo != nil:
return "https"
case m.useTCP && m.ClientTLSInfo == nil:
return "http"
case !m.useTCP && m.ClientTLSInfo != nil:
return "unixs"
case !m.useTCP && m.ClientTLSInfo == nil:
return "unix"
}
m.Logger.Panic("Failed to determine client schema")
return ""
}
func (m *member) addBridge() (*bridge, error) {
network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port
bridgeAddr := grpcAddr + "0"
m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name))
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
if err != nil {
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", bridgeAddr, err)
}
m.grpcBridge, err = newBridge(dialer{network: network, addr: grpcAddr}, bridgeListener)
if err != nil {
bridgeListener.Close()
return nil, err
}
m.grpcURL = m.clientScheme() + "://" + bridgeAddr
return m.grpcBridge, nil
}
func (m *member) Bridge() *bridge {
if !m.useBridge {
m.Logger.Panic("Bridge not available. Please configure using bridge before creating cluster.")
}
return m.grpcBridge
}
func (m *member) grpcAddr() (network, host, port string) {
// prefix with localhost so cert has right domain
host = "localhost"
if m.useIP { // for IP-only TLS certs
host = "127.0.0.1"
}
network = "unix"
if m.useTCP {
network = "tcp"
}
port = m.Name
if m.useTCP {
port = fmt.Sprintf("%d", GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
return network, host, port
}
func GrpcPortNumber(uniqNumber, memberNumber int64) int64 {
return baseGRPCPort + uniqNumber*10 + memberNumber
}
type dialer struct {
network string
addr string
}
func (d dialer) Dial() (net.Conn, error) {
return net.Dial(d.network, d.addr)
}
func (m *member) ElectionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
}
func (m *member) ID() types.ID { return m.s.ID() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcAddr == "" {
if m.grpcURL == "" {
return nil, fmt.Errorf("member not configured for grpc")
}
cfg := clientv3.Config{
Endpoints: []string{m.grpcAddr},
Endpoints: []string{m.grpcURL},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
@ -829,7 +926,7 @@ func (m *member) Launch() error {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
var err error
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
@ -855,8 +952,8 @@ func (m *member) Launch() error {
return err
}
}
m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerRecorder.UnaryInterceptor(), m.grpcServerOpts...)
m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg, m.grpcServerRecorder.UnaryInterceptor())
m.serverClient = v3client.New(m.s)
lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
@ -986,11 +1083,15 @@ func (m *member) Launch() error {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
return nil
}
func (m *member) RecordedRequests() []grpc_testing.RequestInfo {
return m.grpcServerRecorder.RecordedRequests()
}
func (m *member) WaitOK(t testutil.TB) {
m.WaitStarted(t)
for m.s.Leader() == 0 {
@ -1099,7 +1200,7 @@ func (m *member) Stop(_ testutil.TB) {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
m.Close()
m.serverClosers = nil
@ -1108,7 +1209,7 @@ func (m *member) Stop(_ testutil.TB) {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
}
@ -1133,7 +1234,7 @@ func (m *member) Restart(t testutil.TB) error {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners {
@ -1158,7 +1259,7 @@ func (m *member) Restart(t testutil.TB) error {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
zap.Error(err),
)
return err
@ -1171,7 +1272,7 @@ func (m *member) Terminate(t testutil.TB) {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
m.Close()
if !m.keepDataDirTerminate {
@ -1184,7 +1285,7 @@ func (m *member) Terminate(t testutil.TB) {
zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr),
zap.String("grpc-url", m.grpcURL),
)
}
@ -1280,8 +1381,9 @@ func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i]
type ClusterV3 struct {
*cluster
mu sync.Mutex
clients []*clientv3.Client
mu sync.Mutex
clients []*clientv3.Client
clusterClient *clientv3.Client
}
// NewClusterV3 returns a launched cluster with a grpc client connection
@ -1327,6 +1429,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) {
t.Error(err)
}
}
if c.clusterClient != nil {
if err := c.clusterClient.Close(); err != nil {
t.Error(err)
}
}
c.mu.Unlock()
c.cluster.Terminate(t)
}
@ -1339,6 +1446,25 @@ func (c *ClusterV3) Client(i int) *clientv3.Client {
return c.clients[i]
}
func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {
if c.clusterClient == nil {
endpoints := []string{}
for _, m := range c.Members {
endpoints = append(endpoints, m.grpcURL)
}
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
c.clusterClient, err = newClientV3(cfg, cfg.Logger)
if err != nil {
return nil, err
}
}
return c.clusterClient, nil
}
// NewClientV3 creates a new grpc client connection to the member
func (c *ClusterV3) NewClientV3(memberIndex int) (*clientv3.Client, error) {
return NewClientV3(c.Members[memberIndex])
@ -1418,7 +1544,7 @@ func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
// via v3 MemberAdd API, and then launches the new member.
func (c *ClusterV3) AddAndLaunchLearnerMember(t testutil.TB) {
m := c.mustNewMember(t)
m := c.mustNewMember(t, 0)
m.isLearner = true
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
@ -1519,7 +1645,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j],
// MustNewMember creates a new member instance based on the response of V3 Member Add API.
func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member {
m := c.mustNewMember(t)
m := c.mustNewMember(t, 0)
m.isLearner = resp.Member.IsLearner
m.NewCluster = false

View File

@ -173,7 +173,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
}
func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
@ -283,7 +283,7 @@ func testIssue2746(t *testing.T, members int) {
func TestIssue2904(t *testing.T) {
BeforeTest(t)
// start 1-member cluster to ensure member 0 is the leader of the cluster.
c := NewCluster(t, 1)
c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -319,7 +319,7 @@ func TestIssue2904(t *testing.T) {
func TestIssue3699(t *testing.T) {
// start a cluster of 3 nodes a, b, c
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -371,7 +371,7 @@ func TestIssue3699(t *testing.T) {
// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
func TestRejectUnhealthyAdd(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -415,7 +415,7 @@ func TestRejectUnhealthyAdd(t *testing.T) {
// if quorum will be lost.
func TestRejectUnhealthyRemove(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 5)
c := newCluster(t, &ClusterConfig{Size: 5, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -464,7 +464,7 @@ func TestRestartRemoved(t *testing.T) {
BeforeTest(t)
// 1. start single-member cluster
c := NewCluster(t, 1)
c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -540,7 +540,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
func TestSpeedyTerminate(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
// Stop/Restart so requests will time out on lost leaders
for i := 0; i < 3; i++ {
clus.Members[i].Stop(t)

View File

@ -0,0 +1,197 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
tls "crypto/tls"
"fmt"
"strings"
"testing"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
func TestAuthority(t *testing.T) {
tcs := []struct {
name string
useTCP bool
useTLS bool
// Pattern used to generate endpoints for client. Fields filled
// %d - will be filled with member grpc port
// %s - will be filled with member name
clientURLPattern string
// Pattern used to validate authority received by server. Fields filled:
// %d - will be filled with first member grpc port
// %s - will be filled with first member name
expectAuthorityPattern string
}{
{
name: "unix:path",
clientURLPattern: "unix:localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "unix://absolute_path",
clientURLPattern: "unix://localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
// "unixs" is not standard schema supported by etcd
{
name: "unixs:absolute_path",
useTLS: true,
clientURLPattern: "unixs:localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "unixs://absolute_path",
useTLS: true,
clientURLPattern: "unixs://localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "http://domain[:port]",
useTCP: true,
clientURLPattern: "http://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://domain[:port]",
useTLS: true,
useTCP: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "http://address[:port]",
useTCP: true,
clientURLPattern: "http://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://address[:port]",
useTCP: true,
useTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
}
for _, tc := range tcs {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
BeforeTest(t)
cfg := ClusterConfig{
Size: clusterSize,
UseTCP: tc.useTCP,
UseIP: tc.useTCP,
}
cfg, tlsConfig := setupTLS(t, tc.useTLS, cfg)
clus := NewClusterV3(t, &cfg)
defer clus.Terminate(t)
kv := setupClient(t, tc.clientURLPattern, clus, tlsConfig)
defer kv.Close()
_, err := kv.Put(context.TODO(), "foo", "bar")
if err != nil {
t.Fatal(err)
}
assertAuthority(t, templateAuthority(t, tc.expectAuthorityPattern, clus.Members[0]), clus)
})
}
}
}
func setupTLS(t *testing.T, useTLS bool, cfg ClusterConfig) (ClusterConfig, *tls.Config) {
t.Helper()
if useTLS {
cfg.ClientTLS = &testTLSInfo
tlsConfig, err := testTLSInfo.ClientConfig()
if err != nil {
t.Fatal(err)
}
return cfg, tlsConfig
}
return cfg, nil
}
func setupClient(t *testing.T, endpointPattern string, clus *ClusterV3, tlsConfig *tls.Config) *clientv3.Client {
t.Helper()
endpoints := templateEndpoints(t, endpointPattern, clus)
kv, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tlsConfig,
})
if err != nil {
t.Fatal(err)
}
return kv
}
func templateEndpoints(t *testing.T, pattern string, clus *ClusterV3) []string {
t.Helper()
endpoints := []string{}
for _, m := range clus.Members {
ent := pattern
if strings.Contains(ent, "%d") {
ent = fmt.Sprintf(ent, GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
if strings.Contains(ent, "%s") {
ent = fmt.Sprintf(ent, m.Name)
}
if strings.Contains(ent, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
}
endpoints = append(endpoints, ent)
}
return endpoints
}
func templateAuthority(t *testing.T, pattern string, m *member) string {
t.Helper()
authority := pattern
if strings.Contains(authority, "%d") {
authority = fmt.Sprintf(authority, GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
if strings.Contains(authority, "%s") {
authority = fmt.Sprintf(authority, m.Name)
}
if strings.Contains(authority, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", authority)
}
return authority
}
func assertAuthority(t *testing.T, expectedAuthority string, clus *ClusterV3) {
t.Helper()
requestsFound := 0
for _, m := range clus.Members {
for _, r := range m.RecordedRequests() {
requestsFound++
if r.Authority != expectedAuthority {
t.Errorf("Got unexpected authority header, member: %q, request: %q, got authority: %q, expected %q", m.Name, r.FullMethod, r.Authority, expectedAuthority)
}
}
}
if requestsFound == 0 {
t.Errorf("Expected at least one request")
}
}

View File

@ -46,7 +46,7 @@ func TestPauseMember(t *testing.T) {
func TestRestartMember(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -88,7 +88,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
func TestSnapshotAndRestartMember(t *testing.T) {
BeforeTest(t)
m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"})
m := mustNewMember(t, memberConfig{name: "snapAndRestartTest", useBridge: true})
m.SnapshotCount = 100
m.Launch()
defer m.Terminate(t)

View File

@ -36,7 +36,7 @@ func TestClusterProxyMemberList(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCAddr()}, t)
cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t)
defer cts.close(t)
cfg := clientv3.Config{

View File

@ -34,7 +34,7 @@ func TestKVProxyRange(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
kvts := newKVProxyServer([]string{clus.Members[0].GRPCAddr()}, t)
kvts := newKVProxyServer([]string{clus.Members[0].GRPCURL()}, t)
defer kvts.close()
// create a client and try to get key from proxy.

Some files were not shown because too many files have changed in this diff Show More