Compare commits

...

40 Commits

Author SHA1 Message Date
99018a77be version: bump up to 3.5.2 2022-02-01 12:28:34 +01:00
a624446907 Merge pull request #13616 from ptabor/20220117-update-yaml
Update dep: gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254 [release 3.5]
2022-01-17 20:07:16 +01:00
74f33d6665 Update dep: require gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254. 2022-01-17 17:10:50 +01:00
7291ed3c4a Merge pull request #13541 from michaljasionowski/backport-runlock-fix
Backport watchablestore runlock bug fix to release-3.5
2021-12-21 11:03:31 +01:00
55c16df997 fix runlock bug 2021-12-16 15:58:41 +00:00
73080a7166 Merge pull request #13501 from ahrtr/reset_ci_after_reload_db_3.5
[3.5] Set the backend again after recovering v3 backend from snapshot
2021-12-06 13:22:22 +01:00
e84c61104c Merge pull request #13515 from serathius/checkpoints-fix-3.5
Backport Lease Checkpoints fix to release-3.5
2021-12-03 12:21:02 +01:00
d00e89db2e server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL
To avoid inconsistant behavior during cluster upgrade we are feature
gating persistance behind cluster version. This should ensure that
all cluster members are upgraded to v3.6 before changing behavior.

To allow backporting this fix to v3.5 we are also introducing flag
--experimental-enable-lease-checkpoint-persist that will allow for
smooth upgrade in v3.5 clusters with this feature enabled.
2021-12-02 16:54:10 +01:00
eddfb4232f etcdserver,integration: Store remaining TTL on checkpoint
To extend lease checkpointing mechanism to cases when the whole etcd
cluster is restarted.
2021-12-02 16:42:20 +01:00
21634a98c6 lease,integration: add checkpoint scheduling after leader change
Current checkpointing mechanism is buggy. New checkpoints for any lease
are scheduled only until the first leader change. Added fix for that
and a test that will check it.
2021-12-02 16:40:14 +01:00
8c81598455 set the backend again after recovering v3 backend from snapshot 2021-11-25 05:45:20 +08:00
eac7f98699 Merge pull request #13477 from mitake/backport-13308-to-3.5
Backport PR 13308 to release-3.5
2021-11-21 14:45:15 -05:00
dec6f72d68 *: implement a retry logic for auth old revision in the client 2021-11-15 00:09:16 +09:00
79bbc8fdb7 client/v3: refresh the token when ErrUserEmpty is received while retrying
To fix a bug in the retry logic caused when the auth token is cleared after receiving `ErrInvalidAuthToken` from the server and the subsequent call to `getToken` also fails due to some reason (eg. context deadline exceeded).
This leaves the client without a token and the retry will continue to fail with `ErrUserEmpty` unless the token is refreshed.
2021-11-15 00:09:09 +09:00
77d760bf1b Merge pull request #13476 from chaochn47/backport-release-3.5
cherry-pick to 3.5 from #13467 exclude the same alarm type activated by multiple peers
2021-11-13 22:10:19 -05:00
7d44a7cd6e server/etcdserver/api/etcdhttp: exclude the same alarm type activated by multiple peers 2021-11-12 14:21:14 -08:00
e8732fb5f3 Merge pull request #13395 from geetasg/release-3.5
storage/backend: Add a gauge to indicate if defrag is active (backport)
2021-10-07 18:16:21 +02:00
446f7d6b6e storage/backend: Add a gauge to indicate if defrag is active (backport from 3.6) 2021-10-06 11:01:31 -07:00
d42e8589e1 version: 3.5.1
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-10-03 11:47:37 -04:00
ec562294f7 Merge pull request #13380 from hexfusion/cp-13376
[release-3.5] Dockerfile: bump debian bullseye-20210927
2021-10-01 13:23:50 -04:00
bad9a52c4c Dockerfile: bump debian bullseye-20210927
fixes: CVE-2021-3711, CVE-2021-35942, CVE-2019-9893

Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-10-01 12:48:57 -04:00
edb3b5a794 Merge pull request #13375 from serathius/authority-3.5
Cherry pick "Fix http2 authority header in single endpoint scenario" to release-3.5
2021-09-30 13:56:33 +02:00
79f9a45574 client: Use first endpoint as http2 authority header 2021-09-30 12:15:33 +02:00
7f25a500e3 tests: Add grpc authority e2e tests 2021-09-30 12:15:33 +02:00
58d2b12a50 client: Add grpc authority header integration tests 2021-09-30 12:15:32 +02:00
6e04e8ae42 tests: Allow configuring integration tests to use TCP 2021-09-30 12:05:25 +02:00
7272a9585d test: Use unique number for grpc port 2021-09-30 12:05:15 +02:00
0bac49bda4 tests: Cleanup member interface by exposing Bridge directly 2021-09-30 12:05:10 +02:00
f324894e8f tests: Make using bridge optional 2021-09-30 12:05:04 +02:00
994bd08723 tests: Rename grpcAddr to grpcURL to imply that it includes schema 2021-09-30 12:04:57 +02:00
c1f48d8077 tests: Remove bridge dependency on unix 2021-09-30 12:04:50 +02:00
6e2fe84ebd Decouple prefixArgs from os.Env dependency
prefixArgs uses os.Setenv in e2e tests instead envMap.
This creates overwrites in some test cases and have an impact
on test quality and isolation between tests.
This PR uses ctlcontext envMap in each tests with high priority
and merges os environment variables with low priority.
2021-09-30 12:04:31 +02:00
4312298b73 Merge pull request #13348 from serathius/sync
Fix for v3.5 Ensure that cluster members stored in v2store and backend are in sync
2021-09-25 17:33:55 +02:00
e68c7ab4bc server: Ensure that adding and removing members handle storev2 and backend out of sync 2021-09-15 14:36:41 +02:00
d7eeda09f7 Merge pull request #13349 from serathius/tip-3.5
Stop using tip golang version in CI
2021-09-15 08:04:37 -04:00
921f78d56f Stop using tip golang version in CI 2021-09-15 10:29:06 +02:00
2fe94b19d3 Merge pull request #13257 from tangcong/automated-cherry-pick-of-#13145-#13237-origin-release-3.5
[backport 3.5]: Automated cherry pick of #13145 #13237
2021-08-06 09:01:14 -04:00
627d91c89d fix self-signed-cert-validity parameter cannot be specified in the config file 2021-07-30 07:53:43 +08:00
dfd2fea4c5 fix health endpoint not usable when authentication is enabled 2021-07-30 07:53:40 +08:00
beae2e1801 workflows: remove ARM64 job for maintenance
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2021-07-03 13:01:37 -07:00
107 changed files with 1858 additions and 658 deletions

View File

@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: "^1.16" go-version: "1.16.3"
- run: date - run: date
- env: - env:
TARGET: ${{ matrix.target }} TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: "^1.16" go-version: "1.16.3"
- run: date - run: date
- env: - env:
TARGET: ${{ matrix.target }} TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: "^1.16" go-version: "1.16.3"
- run: date - run: date
- env: - env:
TARGET: ${{ matrix.target }} TARGET: ${{ matrix.target }}

View File

@ -1,19 +0,0 @@
name: Linux ARM64 Graviton2
on: [push, pull_request]
jobs:
test:
runs-on: [self-hosted, linux, ARM64, graviton2]
steps:
- uses: actions/checkout@v2
- name: install dependencies
run: |
sudo amazon-linux-extras install epel
sudo yum install -y git gcc ShellCheck
- uses: actions/setup-go@v2
with:
go-version: "^1.16"
- run: go version
- run: date
- name: Run tests
run: TEST_OPTS="PASSES='fmt bom dep build unit integration_e2e'" make test

View File

@ -1,142 +0,0 @@
## Graviton-based self-hosted github action worker
### Step 1. Create an EC2 instance with Graviton
Create an AWS Graviton-based EC2 instance. For example,
```
# or download from https://github.com/aws/aws-k8s-tester/releases
cd ${HOME}/go/src/github.com/aws/aws-k8s-tester
go install -v ./cmd/ec2-utils
# create arm64 AL2 instance
AWS_K8S_TESTER_EC2_ON_FAILURE_DELETE=true \
AWS_K8S_TESTER_EC2_LOG_COLOR=true \
AWS_K8S_TESTER_EC2_REGION=us-west-2 \
AWS_K8S_TESTER_EC2_S3_BUCKET_CREATE=true \
AWS_K8S_TESTER_EC2_S3_BUCKET_CREATE_KEEP=true \
AWS_K8S_TESTER_EC2_REMOTE_ACCESS_KEY_CREATE=true \
AWS_K8S_TESTER_EC2_ASGS_FETCH_LOGS=false \
AWS_K8S_TESTER_EC2_ASGS='{"GetRef.Name-arm64-al2-cpu":{"name":"GetRef.Name-arm64-al2-cpu","remote-access-user-name":"ec2-user","ami-type":"AL2_arm_64","image-id-ssm-parameter":"/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-arm64-gp2","instance-types":["m6g.xlarge"],"volume-size":40,"asg-min-size":1,"asg-max-size":1,"asg-desired-capacity":1}}' \
AWS_K8S_TESTER_EC2_ROLE_CREATE=true \
AWS_K8S_TESTER_EC2_VPC_CREATE=true \
ec2-utils create instances --enable-prompt=true --auto-path
```
### Step 2. Install github action on the host
SSH into the instance, and install the github action self-hosted runner (see [install scripts](https://github.com/etcd-io/etcd/settings/actions/runners/new?arch=arm64&os=linux)).
### Step 3. Configure github action on the host
SSH into the instance, and configure the github action self-hosted runner.
First, we need disable ICU install (see [actions/runner issue on ARM64](https://github.com/actions/runner/issues/629)):
```
sudo yum install -y patch
```
And write this bash script:
```
#!/bin/bash -e
patch -p1 <<ICU_PATCH
diff -Naur a/bin/Runner.Listener.runtimeconfig.json b/bin/Runner.Listener.runtimeconfig.json
--- a/bin/Runner.Listener.runtimeconfig.json 2020-07-01 02:21:09.000000000 +0000
+++ b/bin/Runner.Listener.runtimeconfig.json 2020-07-28 00:02:38.748868613 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
diff -Naur a/bin/Runner.PluginHost.runtimeconfig.json b/bin/Runner.PluginHost.runtimeconfig.json
--- a/bin/Runner.PluginHost.runtimeconfig.json 2020-07-01 02:21:22.000000000 +0000
+++ b/bin/Runner.PluginHost.runtimeconfig.json 2020-07-28 00:02:59.358680003 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
diff -Naur a/bin/Runner.Worker.runtimeconfig.json b/bin/Runner.Worker.runtimeconfig.json
--- a/bin/Runner.Worker.runtimeconfig.json 2020-07-01 02:21:16.000000000 +0000
+++ b/bin/Runner.Worker.runtimeconfig.json 2020-07-28 00:02:19.159028531 +0000
@@ -8,7 +8,8 @@
}
],
"configProperties": {
- "System.Runtime.TieredCompilation.QuickJit": true
+ "System.Runtime.TieredCompilation.QuickJit": true,
+ "System.Globalization.Invariant": true
}
}
-}
\ No newline at end of file
+}
ICU_PATCH
```
And now patch the github action runner:
```
cd ${HOME}/actions-runner
bash patch.sh
```
```
patching file bin/Runner.Listener.runtimeconfig.json
patching file bin/Runner.PluginHost.runtimeconfig.json
patching file bin/Runner.Worker.runtimeconfig.json
```
And now configure:
```
sudo yum install -y wget
INSTANCE_ID=$(wget -q -O - http://169.254.169.254/latest/meta-data/instance-id)
echo ${INSTANCE_ID}
# get token from https://github.com/etcd-io/etcd/settings/actions/runners/new?arch=arm64&os=linux
cd ${HOME}/actions-runner
./config.sh \
--work "_work" \
--name ${INSTANCE_ID} \
--labels self-hosted,linux,ARM64,graviton2 \
--url https://github.com/etcd-io/etcd \
--token ...
```
And run:
```
# run this as a process in the terminal
cd ${HOME}/actions-runner
./run.sh
# or run this as a systemd service
cd ${HOME}/actions-runner
sudo ./svc.sh install
sudo ./svc.sh start
sudo ./svc.sh status
```
### Step 4. Create github action configuration
See https://github.com/etcd-io/etcd/pull/12928.

View File

@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-go@v2 - uses: actions/setup-go@v2
with: with:
go-version: "^1.16" go-version: "1.16.3"
- run: date - run: date
- env: - env:
TARGET: ${{ matrix.target }} TARGET: ${{ matrix.target }}

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base:buster-v1.4.0 # TODO: move to k8s.gcr.io/build-image/debian-base:bullseye-v1.y.z when patched
FROM debian:bullseye-20210927
ADD etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-arm64:buster-v1.4.0 # TODO: move to k8s.gcr.io/build-image/debian-base-arm64:bullseye-1.y.z when patched
FROM arm64v8/debian:bullseye-20210927
ADD etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-ppc64le:buster-v1.4.0 # TODO: move to k8s.gcr.io/build-image/debian-base-ppc64le:bullseye-1.y.z when patched
FROM ppc64le/debian:bullseye-20210927
ADD etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/

View File

@ -1,4 +1,5 @@
FROM k8s.gcr.io/build-image/debian-base-s390x:buster-v1.4.0 # TODO: move to k8s.gcr.io/build-image/debian-base-s390x:bullseye-1.y.z when patched
FROM s390x/debian:bullseye-20210927
ADD etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/

View File

@ -9,6 +9,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0 // indirect
) )
// Bad imports are sometimes causing attempts to pull that code. // Bad imports are sometimes causing attempts to pull that code.

View File

@ -143,7 +143,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -65,6 +65,7 @@ var (
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err() ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err() ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err() ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
@ -131,6 +132,7 @@ var (
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled, ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken, ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt, ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
@ -195,6 +197,7 @@ var (
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted) ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled) ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
ErrNoLeader = Error(ErrGRPCNoLeader) ErrNoLeader = Error(ErrGRPCNoLeader)

View File

@ -26,7 +26,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0" MinClusterVersion = "3.0.0"
Version = "3.5.0" Version = "3.5.2"
APIVersion = "unknown" APIVersion = "unknown"
// Git SHA Value will be set during build // Git SHA Value will be set during build

View File

@ -5,8 +5,8 @@ go 1.16
require ( require (
github.com/json-iterator/go v1.1.11 github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1 github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
) )
replace ( replace (

View File

@ -154,8 +154,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -296,9 +296,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout) dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
} }
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
conn, err := grpc.DialContext(dctx, target, opts...) conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -306,6 +304,20 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
return conn, nil return conn, nil
} }
func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep) r := endpoint.RequiresCredentials(ep)
switch r { switch r {

View File

@ -6,8 +6,8 @@ require (
github.com/dustin/go-humanize v1.0.0 github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_golang v1.11.0
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0

View File

@ -258,8 +258,9 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -73,7 +73,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
// its the callCtx deadline or cancellation, in which case try again. // its the callCtx deadline or cancellation, in which case try again.
continue continue
} }
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { if c.shouldRefreshToken(lastErr, callOpts) {
// clear auth token before refreshing it. // clear auth token before refreshing it.
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side, // call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165) // if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
@ -148,6 +148,19 @@ func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamCli
} }
} }
// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions,
// and returns a boolean value.
func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
// refresh the token when username, password is present but the server returns ErrUserEmpty
// which is possible when the client token is cleared somehow
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
}
return callOpts.retryAuth &&
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
}
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy. // a new ClientStream according to the retry policy.
@ -245,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
// its the callCtx deadline or cancellation, in which case try again. // its the callCtx deadline or cancellation, in which case try again.
return true, err return true, err
} }
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { if s.client.shouldRefreshToken(err, s.callOpts) {
// clear auth token to avoid failure when call getToken // clear auth token to avoid failure when call getToken
s.client.authTokenBundle.UpdateAuthToken("") s.client.authTokenBundle.UpdateAuthToken("")

View File

@ -0,0 +1,124 @@
package clientv3
import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/credentials"
grpccredentials "google.golang.org/grpc/credentials"
"testing"
)
type dummyAuthTokenBundle struct{}
func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
return nil
}
func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
return nil
}
func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
return nil, nil
}
func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
}
func TestClientShouldRefreshToken(t *testing.T) {
type fields struct {
authTokenBundle credentials.Bundle
}
type args struct {
err error
callOpts *options
}
optsWithTrue := &options{
retryAuth: true,
}
optsWithFalse := &options{
retryAuth: false,
}
tests := []struct {
name string
fields fields
args args
want bool
}{
{
name: "ErrUserEmpty and non nil authTokenBundle",
fields: fields{
authTokenBundle: &dummyAuthTokenBundle{},
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: true,
},
{
name: "ErrUserEmpty and nil authTokenBundle",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: false,
},
{
name: "ErrGRPCInvalidAuthToken and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
want: true,
},
{
name: "ErrGRPCInvalidAuthToken and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
want: false,
},
{
name: "ErrGRPCAuthOldRevision and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
want: true,
},
{
name: "ErrGRPCAuthOldRevision and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
want: false,
},
{
name: "Other error and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
want: false,
},
{
name: "Other error and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
authTokenBundle: tt.fields.authTokenBundle,
}
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -125,6 +125,9 @@ peer-transport-security:
# Peer TLS using generated certificates. # Peer TLS using generated certificates.
auto-tls: false auto-tls: false
# The validity period of the self-signed certificate, the unit is year.
self-signed-cert-validity: 1
# Enable debug-level logging for etcd. # Enable debug-level logging for etcd.
log-level: debug log-level: debug

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4 github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.0 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.0 go.etcd.io/etcd/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.0 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.0 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
) )

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0 github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.0 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdctl/v3 v3.5.0 go.etcd.io/etcd/etcdctl/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.0 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.0 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.0 go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
) )

View File

@ -0,0 +1,69 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpc_testing
import (
"context"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}
type RequestInfo struct {
FullMethod string
Authority string
}
func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(toRequestInfo(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}
func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}
func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}
func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
) )
// Bad imports are sometimes causing attempts to pull that code. // Bad imports are sometimes causing attempts to pull that code.

View File

@ -147,10 +147,12 @@ type ServerConfig struct {
ForceNewCluster bool ForceNewCluster bool
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool
EnableGRPCGateway bool EnableGRPCGateway bool

View File

@ -207,7 +207,7 @@ type Config struct {
// SelfSignedCertValidity specifies the validity period of the client and peer certificates // SelfSignedCertValidity specifies the validity period of the client and peer certificates
// that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS, // that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS,
// the unit is year, and the default is 1 // the unit is year, and the default is 1
SelfSignedCertValidity uint SelfSignedCertValidity uint `json:"self-signed-cert-validity"`
// CipherSuites is a list of supported TLS cipher suites between // CipherSuites is a list of supported TLS cipher suites between
// client/server and peers. If empty, Go auto-populates the list. // client/server and peers. If empty, Go auto-populates the list.
@ -314,10 +314,15 @@ type Config struct {
// Deprecated in v3.5. // Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value. // takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
@ -591,7 +596,9 @@ func (cfg *configYAML) configFromFile(path string) error {
copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON) copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON)
cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS
cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS
if cfg.SelfSignedCertValidity == 0 {
cfg.SelfSignedCertValidity = 1
}
return cfg.Validate() return cfg.Validate()
} }
@ -676,6 +683,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
} }
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}
return nil return nil
} }

View File

@ -291,6 +291,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
} }
} }
func TestLeaseCheckpointValidate(t *testing.T) {
tcs := []struct {
name string
configFunc func() Config
expectError bool
}{
{
name: "Default config should pass",
configFunc: func() Config {
return *NewConfig()
},
},
{
name: "Enabling checkpoint leases should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
return cfg
},
},
{
name: "Enabling checkpoint leases and persist should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
},
{
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
expectError: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cfg := tc.configFunc()
err := cfg.Validate()
if (err != nil) != tc.expectError {
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
}
})
}
}
func TestLogRotation(t *testing.T) { func TestLogRotation(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -216,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync, UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
@ -539,7 +540,7 @@ func (e *Etcd) servePeers() (err error) {
for _, p := range e.Peers { for _, p := range e.Peers {
u := p.Listener.Addr().String() u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg) gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener) m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2())) go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{ srv := &http.Server{

View File

@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}() }()
if sctx.insecure { if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...) gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection) v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock) v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil { if sctx.serviceRegister != nil {
@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil { if tlsErr != nil {
return tlsErr return tlsErr
} }
gs = v3rpc.Server(s, tlscfg, gopts...) gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection) v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock) v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil { if sctx.serviceRegister != nil {

View File

@ -280,7 +280,9 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")

View File

@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -137,8 +138,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
for _, v := range as { for _, v := range as {
alarmName := v.Alarm.String() alarmName := v.Alarm.String()
if _, found := excludedAlarms[alarmName]; found { if _, found := excludedAlarms[alarmName]; found {
lg.Debug("/health excluded alarm", zap.String("alarm", alarmName)) lg.Debug("/health excluded alarm", zap.String("alarm", v.String()))
delete(excludedAlarms, alarmName)
continue continue
} }
@ -156,10 +156,6 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
} }
} }
if len(excludedAlarms) > 0 {
lg.Warn("fail exclude alarms from health check", zap.String("exclude alarms", fmt.Sprintf("%+v", excludedAlarms)))
}
if uint64(srv.Leader()) == raft.None { if uint64(srv.Leader()) == raft.None {
h.Health = "false" h.Health = "false"
h.Reason = "RAFT NO LEADER" h.Reason = "RAFT NO LEADER"
@ -193,7 +189,7 @@ func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms Al
ctx, cancel := context.WithTimeout(context.Background(), srv.Cfg.ReqTimeout()) ctx, cancel := context.WithTimeout(context.Background(), srv.Cfg.ReqTimeout())
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1}) _, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
cancel() cancel()
if err != nil { if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
h.Health = "false" h.Health = "false"
h.Reason = fmt.Sprintf("RANGE ERROR:%s", err) h.Reason = fmt.Sprintf("RANGE ERROR:%s", err)
lg.Warn("serving /health false; Range fails", zap.Error(err)) lg.Warn("serving /health false; Range fails", zap.Error(err))

View File

@ -78,6 +78,12 @@ func TestHealthHandler(t *testing.T) {
http.StatusOK, http.StatusOK,
"true", "true",
}, },
{
[]*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
"/health?exclude=NOSPACE",
http.StatusOK,
"true",
},
{ {
[]*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
"/health?exclude=NOSPACE", "/health?exclude=NOSPACE",

View File

@ -20,6 +20,7 @@ import (
"crypto/sha1" "crypto/sha1"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"path" "path"
"sort" "sort"
@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil" "go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/mvcc/buckets"
@ -254,12 +256,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if c.be != nil { if c.v2store != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store) c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store)
} else {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} }
if c.be != nil { if c.be != nil {
@ -381,11 +383,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil { if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m) v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
}
} }
if c.be != nil && shouldApplyV3 { if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m) beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
c.lg.Panic(
"failed to save member to backend",
zap.String("member-id", m.ID.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member already exist.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
} }
c.members[m.ID] = m c.members[m.ID] = m
@ -404,11 +432,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) { func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil { if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id) v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
}
} }
if c.be != nil && shouldApplyV3 { if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id) beErr = unsafeDeleteMemberFromBackend(c.be, id)
if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
c.lg.Panic(
"failed to delete member from backend",
zap.String("member-id", id.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member not found.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
} }
m, ok := c.members[id] m, ok := c.members[id]
@ -443,7 +496,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m) mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
} }
if c.be != nil && shouldApplyV3 { if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m) unsafeSaveMemberToBackend(c.lg, c.be, m)
} }
return return
} }
@ -476,7 +529,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
} }
if c.be != nil && shouldApplyV3 { if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id]) unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
} }
c.lg.Info( c.lg.Info(
@ -495,7 +548,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
} }
if c.be != nil && shouldApplyV3 { if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id]) unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
} }
c.lg.Info( c.lg.Info(
@ -870,7 +923,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil { if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be) TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members { for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m) unsafeSaveMemberToBackend(c.lg, c.be, m)
} }
} }
if c.v2store != nil { if c.v2store != nil {

View File

@ -20,8 +20,12 @@ import (
"path" "path"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/testutil"
@ -29,8 +33,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mock/mockstore" "go.etcd.io/etcd/server/v3/mock/mockstore"
"go.uber.org/zap"
) )
func TestClusterMember(t *testing.T) { func TestClusterMember(t *testing.T) {
@ -1019,3 +1021,193 @@ func TestIsVersionChangable(t *testing.T) {
}) })
} }
} }
func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectPanics bool
expectMembers map[types.ID]*Member
}{
{
name: "Adding new member should succeed",
},
{
name: "Adding member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Adding member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Adding member should fail if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
expectPanics: true,
},
{
name: "Adding member should fail if it exists in storeV2 and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
expectPanics: true,
},
{
name: "Adding member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
{
name: "Adding new member should succeed if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
},
{
name: "Adding new member should fail if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.AddMember(alice, ApplyBoth)
})
} else {
cluster.AddMember(alice, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers)
}
})
}
}
func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectMembers []*Member
expectPanics bool
}{
{
name: "Removing new member should fail",
expectPanics: true,
},
{
name: "Removing member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Removing member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Removing member should succeed if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
},
{
name: "Removing new member should fail if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
expectPanics: true,
},
{
name: "Removing new member should succeed if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
{
name: "Removing member should succeed if it exists in v2storage and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
},
{
name: "Removing member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.RemoveMember(alice.ID, ApplyBoth)
})
} else {
cluster.RemoveMember(alice.ID, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{}, beMembers)
}
})
}
}

View File

@ -15,6 +15,7 @@
package membership package membership
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"path" "path"
@ -39,9 +40,11 @@ const (
var ( var (
StoreMembersPrefix = path.Join(storePrefix, "members") StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
errMemberAlreadyExist = fmt.Errorf("member already exists")
errMemberNotFound = fmt.Errorf("member not found")
) )
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
mkey := backendMemberKey(m.ID) mkey := backendMemberKey(m.ID)
mvalue, err := json.Marshal(m) mvalue, err := json.Marshal(m)
if err != nil { if err != nil {
@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx() tx := be.BatchTx()
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
}
tx.UnsafePut(buckets.Members, mkey, mvalue) tx.UnsafePut(buckets.Members, mkey, mvalue)
return nil
} }
// TrimClusterFromBackend removes all information about cluster (versions) // TrimClusterFromBackend removes all information about cluster (versions)
@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error {
return nil return nil
} }
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id) mkey := backendMemberKey(id)
tx := be.BatchTx() tx := be.BatchTx()
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
return errMemberNotFound
}
tx.UnsafeDelete(buckets.Members, mkey)
return nil
}
func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
var found bool
tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
if bytes.Equal(k, mkey) {
found = true
}
return nil
})
return found
} }
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
} }
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes) err := unsafeSaveMemberToStore(lg, s, m)
if err != nil { if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic( lg.Panic(
"failed to save member to store", "failed to save member to store",
zap.String("path", p), zap.String("member-id", m.ID.String()),
zap.Error(err), zap.Error(err),
) )
} }
} }
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
_, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
return err
}
func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic( return err
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
)
} }
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic( return err
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
)
} }
return nil
} }
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) { func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {

View File

@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32 maxSendBytes = math.MaxInt32
) )
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{})) opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil { if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls}) bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials())) opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
} }
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s), newLogUnaryInterceptor(s),
newUnaryInterceptor(s), newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor, grpc_prometheus.UnaryServerInterceptor,
} }
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}
chainStreamInterceptors := []grpc.StreamServerInterceptor{ chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s), newStreamInterceptor(s),

View File

@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled, auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken, auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt, auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
// In sync with status.FromContextError // In sync with status.FromContextError
context.Canceled: rpctypes.ErrGRPCCanceled, context.Canceled: rpctypes.ErrGRPCCanceled,

View File

@ -514,6 +514,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
} }
// A snapshot db may have already been recovered, and the old db should have
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)
s1, s2 := be.Size(), be.SizeInUse() s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info( cfg.Logger.Info(
"recovered v3 backend from snapshot", "recovered v3 backend from snapshot",
@ -592,9 +595,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval, CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
}) })

View File

@ -25,12 +25,12 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.0 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.0 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.0 go.etcd.io/etcd/raft/v3 v3.5.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0 go.opentelemetry.io/otel/exporters/otlp v0.20.0

View File

@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value // MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000 const MaxLeaseTTL = 9000000000
var v3_6 = semver.Version{Major: 3, Minor: 6}
var ( var (
forever = time.Time{} forever = time.Time{}
@ -180,19 +183,29 @@ type lessor struct {
checkpointInterval time.Duration checkpointInterval time.Duration
// the interval to check if the expired lease is revoked // the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}
type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
} }
type LessorConfig struct { type LessorConfig struct {
MinLeaseTTL int64 MinLeaseTTL int64
CheckpointInterval time.Duration CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
} }
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg) return newLessor(lg, b, cluster, cfg)
} }
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 { if checkpointInterval == 0 {
@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL, minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval, checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking. // expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16), expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}), stopC: make(chan struct{}),
doneC: make(chan struct{}), doneC: make(chan struct{}),
lg: lg, lg: lg,
cluster: cluster,
} }
l.initAndRecover() l.initAndRecover()
@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok { if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() { if le.isPrimary() {
// schedule the next checkpoint as needed // schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l) le.scheduleCheckpointIfNeeded(l)
@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil return nil
} }
func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}
func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}
// Renew renews an existing lease. If the given lease does not exist or // Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned. // has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) { func (le *lessor) Renew(id LeaseID) (int64, error) {
@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend) l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry} item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item) le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
} }
if len(le.leaseMap) < leaseRevokeRate { if len(le.leaseMap) < leaseRevokeRate {
@ -789,9 +817,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL, ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs // itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted // set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}), itemSet: make(map[LeaseItem]struct{}),
expiry: forever, expiry: forever,
revokec: make(chan struct{}), revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
} }
} }
le.leaseExpiredNotifier.Init() le.leaseExpiredNotifier.Init()

View File

@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t) be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark. // MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete { le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()} ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock() ftd.Lock()

View File

@ -26,7 +26,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap" "go.uber.org/zap"
@ -46,7 +48,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.Promote(0) le.Promote(0)
@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
var fd *fakeDeleter var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete { le.SetRangeDeleter(func() TxnDelete {
@ -210,7 +212,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close() defer be.Close()
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.Promote(0) le.Promote(0)
@ -243,7 +245,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close() defer be.Close()
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() { for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@ -292,7 +294,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t) dir, be := NewTestBackend(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10) ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ { for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -311,7 +313,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be") bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg) be = backend.New(bcfg)
defer be.Close() defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
// extend after recovery should extend expiration on lease pile-up // extend after recovery should extend expiration on lease pile-up
@ -341,7 +343,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -382,7 +384,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
l1, err1 := le.Grant(1, 10) l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20) l2, err2 := le.Grant(2, 20)
@ -391,7 +393,7 @@ func TestLessorRecover(t *testing.T) {
} }
// Create a new lessor with the same backend // Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop() defer nle.Stop()
nl1 := nle.Lookup(l1.ID) nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl { if nl1 == nil || nl1.ttl != l1.ttl {
@ -412,7 +414,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1) testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop() defer le.Stop()
le.Promote(1 * time.Second) le.Promote(1 * time.Second)
@ -465,7 +467,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1) testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop() defer le.Stop()
le.Promote(1 * time.Second) le.Promote(1 * time.Second)
@ -514,7 +516,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1) _, err := le.Grant(1, MaxLeaseTTL+1)
@ -530,7 +532,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
defer le.Stop()
le.minLeaseTTL = 1 le.minLeaseTTL = 1
checkpointedC := make(chan struct{}) checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@ -543,13 +546,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
} }
}) })
defer le.Stop()
le.Promote(0)
_, err := le.Grant(1, 2) _, err := le.Grant(1, 2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
le.Promote(0)
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select { select {
@ -565,7 +566,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
l, err := le.Grant(1, 10) l, err := le.Grant(1, 10)
if err != nil { if err != nil {
@ -579,6 +580,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
} }
} }
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
const ttl int64 = 10
const checkpointTTL int64 = 5
tcs := []struct {
name string
cluster cluster
checkpointPersist bool
expectRemainingTTL int64
}{
{
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
cluster: clusterV3_6(),
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
cluster: clusterLatest(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
cluster: clusterNil(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
cluster: clusterLatest(),
expectRemainingTTL: ttl,
},
{
name: "Etcd with version unknown fallbacks to v3.5 behavior",
cluster: clusterNil(),
expectRemainingTTL: ttl,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
cfg.CheckpointPersist = tc.checkpointPersist
le := newLessor(lg, be, tc.cluster, cfg)
l, err := le.Grant(2, ttl)
if err != nil {
t.Fatal(err)
}
if l.RemainingTTL() != ttl {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
}
le.Checkpoint(2, checkpointTTL)
if l.RemainingTTL() != checkpointTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
}
le.Stop()
le2 := newLessor(lg, be, clusterV3_6(), cfg)
l = le2.Lookup(2)
if l.RemainingTTL() != tc.expectRemainingTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
}
})
}
}
type fakeDeleter struct { type fakeDeleter struct {
deleted []string deleted []string
tx backend.BatchTx tx backend.BatchTx
@ -606,3 +676,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be") bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg) return tmpPath, backend.New(bcfg)
} }
func clusterV3_6() cluster {
return fakeCluster{semver.New("3.6.0")}
}
func clusterLatest() cluster {
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
}
func clusterNil() cluster {
return fakeCluster{}
}
type fakeCluster struct {
version *semver.Version
}
func (c fakeCluster) Version() *semver.Version {
return c.version
}

View File

@ -432,6 +432,8 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error { func (b *backend) defrag() error {
now := time.Now() now := time.Now()
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// TODO: make this non-blocking? // TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then // lock batchTx to ensure nobody is using previous tx, and then

View File

@ -83,6 +83,13 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec // highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17), Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
}) })
isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "defrag_inflight",
Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
})
) )
func init() { func init() {
@ -92,4 +99,5 @@ func init() {
prometheus.MustRegister(writeSec) prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec) prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec) prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(isDefragActive)
} }

View File

@ -355,8 +355,11 @@ func (s *watchableStore) syncWatchers() int {
tx := s.store.b.ReadTx() tx := s.store.b.ReadTx()
tx.RLock() tx.RLock()
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0) revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs) evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
var victims watcherBatch var victims watcherBatch
wb := newWatcherBatch(wg, evs) wb := newWatcherBatch(wg, evs)

View File

@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig) return p.etcdProc.WithStopSignal(sig)
} }
func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}
type proxyProc struct { type proxyProc struct {
lg *zap.Logger lg *zap.Logger
execPath string execPath string
@ -132,7 +136,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil { if pp.proc != nil {
panic("already started") panic("already started")
} }
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...)) proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct {
execPath string execPath string
dataDirPath string dataDirPath string
keepDataDir bool keepDataDir bool
envVars map[string]string
clusterSize int clusterSize int
@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
lg: lg, lg: lg,
execPath: cfg.execPath, execPath: cfg.execPath,
args: args, args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(), tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath, dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir, keepDataDir: cfg.keepDataDir,

View File

@ -505,7 +505,7 @@ func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir st
cmdArgs = append(cmdArgs, "--with-v3=false") cmdArgs = append(cmdArgs, "--with-v3=false")
} }
t.Logf("Running: %v", cmdArgs) t.Logf("Running: %v", cmdArgs)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -101,5 +101,5 @@ func alarmTest(cx ctlCtx) {
func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error { func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error {
cmdArgs := append(cx.PrefixArgs(), "alarm", cmd) cmdArgs := append(cx.PrefixArgs(), "alarm", cmd)
return spawnWithExpects(cmdArgs, as...) return spawnWithExpects(cmdArgs, cx.envMap, as...)
} }

View File

@ -93,7 +93,7 @@ func authEnable(cx ctlCtx) error {
func ctlV3AuthEnable(cx ctlCtx) error { func ctlV3AuthEnable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "enable") cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
return spawnWithExpect(cmdArgs, "Authentication Enabled") return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
} }
func authDisableTest(cx ctlCtx) { func authDisableTest(cx ctlCtx) {
@ -139,12 +139,12 @@ func authDisableTest(cx ctlCtx) {
func ctlV3AuthDisable(cx ctlCtx) error { func ctlV3AuthDisable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "disable") cmdArgs := append(cx.PrefixArgs(), "auth", "disable")
return spawnWithExpect(cmdArgs, "Authentication Disabled") return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Disabled")
} }
func authStatusTest(cx ctlCtx) { func authStatusTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "auth", "status") cmdArgs := append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: false", "AuthRevision:"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: false", "AuthRevision:"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
@ -155,15 +155,15 @@ func authStatusTest(cx ctlCtx) {
cx.user, cx.pass = "root", "root" cx.user, cx.pass = "root", "root"
cmdArgs = append(cx.PrefixArgs(), "auth", "status") cmdArgs = append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: true", "AuthRevision:"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: true", "AuthRevision:"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmdArgs = append(cx.PrefixArgs(), "auth", "status", "--write-out", "json") cmdArgs = append(cx.PrefixArgs(), "auth", "status", "--write-out", "json")
if err := spawnWithExpect(cmdArgs, "enabled"); err != nil { if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "enabled"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := spawnWithExpect(cmdArgs, "authRevision"); err != nil { if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "authRevision"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -381,25 +381,25 @@ func authRoleRevokeDuringOpsTest(cx ctlCtx) {
} }
func ctlV3PutFailAuth(cx ctlCtx, key, val string) error { func ctlV3PutFailAuth(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "authentication failed") return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "authentication failed")
} }
func ctlV3PutFailPerm(cx ctlCtx, key, val string) error { func ctlV3PutFailPerm(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "permission denied") return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "permission denied")
} }
func authSetupTestUser(cx ctlCtx) { func authSetupTestUser(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil { if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil { if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3User(cx, []string{"grant-role", "test-user", "test-role"}, "Role test-role is granted to user test-user", nil); err != nil { if err := ctlV3User(cx, []string{"grant-role", "test-user", "test-role"}, "Role test-role is granted to user test-user", nil); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmd := append(cx.PrefixArgs(), "role", "grant-permission", "test-role", "readwrite", "foo") cmd := append(cx.PrefixArgs(), "role", "grant-permission", "test-role", "readwrite", "foo")
if err := spawnWithExpect(cmd, "Role test-role updated"); err != nil { if err := spawnWithExpectWithEnv(cmd, cx.envMap, "Role test-role updated"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -611,7 +611,7 @@ func authTestCertCN(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "example.com", "--interactive=false"}, "User example.com created", []string{""}); err != nil { if err := ctlV3User(cx, []string{"add", "example.com", "--interactive=false"}, "User example.com created", []string{""}); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil { if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role"}, "Role test-role is granted to user example.com", nil); err != nil { if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role"}, "Role test-role is granted to user example.com", nil); err != nil {
@ -921,13 +921,13 @@ func authTestRoleGet(cx ctlCtx) {
"KV Read:", "foo", "KV Read:", "foo",
"KV Write:", "foo", "KV Write:", "foo",
} }
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
// test-user can get the information of test-role because it belongs to the role // test-user can get the information of test-role because it belongs to the role
cx.user, cx.pass = "test-user", "pass" cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
@ -935,7 +935,7 @@ func authTestRoleGet(cx ctlCtx) {
expected = []string{ expected = []string{
"Error: etcdserver: permission denied", "Error: etcdserver: permission denied",
} }
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -952,13 +952,13 @@ func authTestUserGet(cx ctlCtx) {
"Roles: test-role", "Roles: test-role",
} }
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
// test-user can get the information of test-user itself // test-user can get the information of test-user itself
cx.user, cx.pass = "test-user", "pass" cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
@ -966,7 +966,7 @@ func authTestUserGet(cx ctlCtx) {
expected = []string{ expected = []string{
"Error: etcdserver: permission denied", "Error: etcdserver: permission denied",
} }
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), expected...); err != nil { if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -977,7 +977,7 @@ func authTestRoleList(cx ctlCtx) {
} }
cx.user, cx.pass = "root", "root" cx.user, cx.pass = "root", "root"
authSetupTestUser(cx) authSetupTestUser(cx)
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "list"), "test-role"); err != nil { if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "list"), cx.envMap, "test-role"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -1088,7 +1088,7 @@ func certCNAndUsername(cx ctlCtx, noPassword bool) {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), "Role test-role-cn created"); err != nil { if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), cx.envMap, "Role test-role-cn created"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role-cn"}, "Role test-role-cn is granted to user example.com", nil); err != nil { if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role-cn"}, "Role test-role-cn is granted to user example.com", nil); err != nil {

View File

@ -71,5 +71,5 @@ func ctlV3Compact(cx ctlCtx, rev int64, physical bool) error {
if physical { if physical {
cmdArgs = append(cmdArgs, "--physical") cmdArgs = append(cmdArgs, "--physical")
} }
return spawnWithExpect(cmdArgs, "compacted revision "+rs) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "compacted revision "+rs)
} }

View File

@ -52,13 +52,13 @@ func ctlV3OnlineDefrag(cx ctlCtx) error {
for i := range lines { for i := range lines {
lines[i] = "Finished defragmenting etcd member" lines[i] = "Finished defragmenting etcd member"
} }
return spawnWithExpects(cmdArgs, lines...) return spawnWithExpects(cmdArgs, cx.envMap, lines...)
} }
func ctlV3OfflineDefrag(cx ctlCtx) error { func ctlV3OfflineDefrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgsUtl(), "defrag", "--data-dir", cx.dataDir) cmdArgs := append(cx.PrefixArgsUtl(), "defrag", "--data-dir", cx.dataDir)
lines := []string{"finished defragmenting directory"} lines := []string{"finished defragmenting directory"}
return spawnWithExpects(cmdArgs, lines...) return spawnWithExpects(cmdArgs, cx.envMap, lines...)
} }
func defragOfflineTest(cx ctlCtx) { func defragOfflineTest(cx ctlCtx) {

View File

@ -98,7 +98,7 @@ func testElect(cx ctlCtx) {
// ctlV3Elect creates a elect process with a channel listening for when it wins the election. // ctlV3Elect creates a elect process with a channel listening for when it wins the election.
func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) { func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal) cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1) outc := make(chan string, 1)
if err != nil { if err != nil {
close(outc) close(outc)

View File

@ -40,7 +40,7 @@ func ctlV3EndpointHealth(cx ctlCtx) error {
for i := range lines { for i := range lines {
lines[i] = "is healthy" lines[i] = "is healthy"
} }
return spawnWithExpects(cmdArgs, lines...) return spawnWithExpects(cmdArgs, cx.envMap, lines...)
} }
func endpointStatusTest(cx ctlCtx) { func endpointStatusTest(cx ctlCtx) {
@ -56,7 +56,7 @@ func ctlV3EndpointStatus(cx ctlCtx) error {
u, _ := url.Parse(ep) u, _ := url.Parse(ep)
eps = append(eps, u.Host) eps = append(eps, u.Host)
} }
return spawnWithExpects(cmdArgs, eps...) return spawnWithExpects(cmdArgs, cx.envMap, eps...)
} }
func endpointHashKVTest(cx ctlCtx) { func endpointHashKVTest(cx ctlCtx) {
@ -88,5 +88,5 @@ func ctlV3EndpointHashKV(cx ctlCtx) error {
u, _ := url.Parse(ep) u, _ := url.Parse(ep)
ss = append(ss, fmt.Sprintf("%s, %d", u.Host, hresp.Hash)) ss = append(ss, fmt.Sprintf("%s, %d", u.Host, hresp.Hash))
} }
return spawnWithExpects(cmdArgs, ss...) return spawnWithExpects(cmdArgs, cx.envMap, ss...)
} }

View File

@ -0,0 +1,213 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !cluster_proxy
// +build !cluster_proxy
package e2e
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/testutil"
)
func TestAuthority(t *testing.T) {
tcs := []struct {
name string
useTLS bool
useInsecureTLS bool
// Pattern used to generate endpoints for client. Fields filled
// %d - will be filled with member grpc port
clientURLPattern string
// Pattern used to validate authority received by server. Fields filled:
// %d - will be filled with first member grpc port
expectAuthorityPattern string
}{
{
name: "http://domain[:port]",
clientURLPattern: "http://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "http://address[:port]",
clientURLPattern: "http://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port]",
useTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port]",
useTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
}
for _, tc := range tcs {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
BeforeTest(t)
cfg := newConfigNoTLS()
cfg.clusterSize = clusterSize
if tc.useTLS {
cfg.clientTLS = clientTLS
}
cfg.isClientAutoTLS = tc.useInsecureTLS
// Enable debug mode to get logs with http2 headers (including authority)
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}
epc, err := newEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer epc.Close()
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)
client := clusterEtcdctlV3(cfg, endpoints)
err = client.Put("foo", "bar")
if err != nil {
t.Fatal(err)
}
executeWithTimeout(t, 5*time.Second, func() {
assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc)
})
})
}
}
}
func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string {
t.Helper()
endpoints := []string{}
for i := 0; i < clus.cfg.clusterSize; i++ {
ent := pattern
if strings.Contains(ent, "%d") {
ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5)
}
if strings.Contains(ent, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
}
endpoints = append(endpoints, ent)
}
return endpoints
}
func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) {
logs := []logsExpect{}
for _, proc := range clus.procs {
logs = append(logs, proc.Logs())
}
line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...)
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority)
assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine))
}
func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string {
t.Helper()
match := make(chan string, len(logs))
for i := range logs {
go func(l logsExpect) {
line, _ := l.Expect(expectLine)
match <- line
}(logs[i])
}
return <-match
}
func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()
select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}
type etcdctlV3 struct {
cfg *etcdProcessClusterConfig
endpoints []string
}
func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 {
return &etcdctlV3{
cfg: cfg,
endpoints: endpoints,
}
}
func (ctl *etcdctlV3) Put(key, value string) error {
return ctl.runCmd("put", key, value)
}
func (ctl *etcdctlV3) runCmd(args ...string) error {
cmdArgs := []string{ctlBinPath + "3"}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
cmdArgs = append(cmdArgs, args...)
return spawnWithExpect(cmdArgs, "OK")
}
func (ctl *etcdctlV3) flags() map[string]string {
fmap := make(map[string]string)
if ctl.cfg.clientTLS == clientTLS {
if ctl.cfg.isClientAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
} else if ctl.cfg.isClientCRL {
fmap["cacert"] = caPath
fmap["cert"] = revokedCertPath
fmap["key"] = revokedPrivateKeyPath
} else {
fmap["cacert"] = caPath
fmap["cert"] = certPath
fmap["key"] = privateKeyPath
}
}
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
return fmap
}

View File

@ -190,7 +190,7 @@ func getFormatTest(cx ctlCtx) {
cmdArgs = append(cmdArgs, "--print-value-only") cmdArgs = append(cmdArgs, "--print-value-only")
} }
cmdArgs = append(cmdArgs, "abc") cmdArgs = append(cmdArgs, "abc")
if err := spawnWithExpect(cmdArgs, tt.wstr); err != nil { if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, tt.wstr); err != nil {
cx.t.Errorf("#%d: error (%v), wanted %v", i, err, tt.wstr) cx.t.Errorf("#%d: error (%v), wanted %v", i, err, tt.wstr)
} }
} }
@ -228,24 +228,24 @@ func getKeysOnlyTest(cx ctlCtx) {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmdArgs := append(cx.PrefixArgs(), []string{"get", "--keys-only", "key"}...) cmdArgs := append(cx.PrefixArgs(), []string{"get", "--keys-only", "key"}...)
if err := spawnWithExpect(cmdArgs, "key"); err != nil { if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "key"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := spawnWithExpects(cmdArgs, "val"); err == nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "val"); err == nil {
cx.t.Fatalf("got value but passed --keys-only") cx.t.Fatalf("got value but passed --keys-only")
} }
} }
func getCountOnlyTest(cx ctlCtx) { func getCountOnlyTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...) cmdArgs := append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 0"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 0"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3Put(cx, "key", "val", ""); err != nil { if err := ctlV3Put(cx, "key", "val", ""); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...) cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 1"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 1"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3Put(cx, "key1", "val", ""); err != nil { if err := ctlV3Put(cx, "key1", "val", ""); err != nil {
@ -255,21 +255,21 @@ func getCountOnlyTest(cx ctlCtx) {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...) cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 2"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 2"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
if err := ctlV3Put(cx, "key2", "val", ""); err != nil { if err := ctlV3Put(cx, "key2", "val", ""); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...) cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, "\"Count\" : 3"); err != nil { if err := spawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 3"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
expected := []string{ expected := []string{
"\"Count\" : 3", "\"Count\" : 3",
} }
cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key3", "--prefix", "--write-out=fields"}...) cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key3", "--prefix", "--write-out=fields"}...)
if err := spawnWithExpects(cmdArgs, expected...); err == nil { if err := spawnWithExpects(cmdArgs, cx.envMap, expected...); err == nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -348,7 +348,7 @@ func ctlV3Put(cx ctlCtx, key, value, leaseID string, flags ...string) error {
if len(flags) != 0 { if len(flags) != 0 {
cmdArgs = append(cmdArgs, flags...) cmdArgs = append(cmdArgs, flags...)
} }
return spawnWithExpect(cmdArgs, "OK") return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "OK")
} }
type kv struct { type kv struct {
@ -365,7 +365,7 @@ func ctlV3Get(cx ctlCtx, args []string, kvs ...kv) error {
for _, elem := range kvs { for _, elem := range kvs {
lines = append(lines, elem.key, elem.val) lines = append(lines, elem.key, elem.val)
} }
return spawnWithExpects(cmdArgs, lines...) return spawnWithExpects(cmdArgs, cx.envMap, lines...)
} }
// ctlV3GetWithErr runs "get" command expecting no output but error // ctlV3GetWithErr runs "get" command expecting no output but error
@ -375,11 +375,11 @@ func ctlV3GetWithErr(cx ctlCtx, args []string, errs []string) error {
if !cx.quorum { if !cx.quorum {
cmdArgs = append(cmdArgs, "--consistency", "s") cmdArgs = append(cmdArgs, "--consistency", "s")
} }
return spawnWithExpects(cmdArgs, errs...) return spawnWithExpects(cmdArgs, cx.envMap, errs...)
} }
func ctlV3Del(cx ctlCtx, args []string, num int) error { func ctlV3Del(cx ctlCtx, args []string, num int) error {
cmdArgs := append(cx.PrefixArgs(), "del") cmdArgs := append(cx.PrefixArgs(), "del")
cmdArgs = append(cmdArgs, args...) cmdArgs = append(cmdArgs, args...)
return spawnWithExpects(cmdArgs, fmt.Sprintf("%d", num)) return spawnWithExpects(cmdArgs, cx.envMap, fmt.Sprintf("%d", num))
} }

View File

@ -113,7 +113,7 @@ func leaseTestGrantTimeToLive(cx ctlCtx) {
} }
cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", id, "--keys") cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", id, "--keys")
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
cx.t.Fatalf("leaseTestGrantTimeToLive: error (%v)", err) cx.t.Fatalf("leaseTestGrantTimeToLive: error (%v)", err)
} }
@ -146,7 +146,7 @@ func leaseTestGrantLeasesList(cx ctlCtx) error {
} }
cmdArgs := append(cx.PrefixArgs(), "lease", "list") cmdArgs := append(cx.PrefixArgs(), "lease", "list")
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return fmt.Errorf("lease list failed (%v)", err) return fmt.Errorf("lease list failed (%v)", err)
} }
@ -177,7 +177,7 @@ func leaseTestTimeToLiveExpire(cx ctlCtx, ttl int) error {
time.Sleep(time.Duration(ttl+1) * time.Second) time.Sleep(time.Duration(ttl+1) * time.Second)
cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", leaseID) cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", leaseID)
exp := fmt.Sprintf("lease %s already expired", leaseID) exp := fmt.Sprintf("lease %s already expired", leaseID)
if err = spawnWithExpect(cmdArgs, exp); err != nil { if err = spawnWithExpectWithEnv(cmdArgs, cx.envMap, exp); err != nil {
return fmt.Errorf("lease not properly expired: (%v)", err) return fmt.Errorf("lease not properly expired: (%v)", err)
} }
if err := ctlV3Get(cx, []string{"key"}); err != nil { if err := ctlV3Get(cx, []string{"key"}); err != nil {
@ -247,7 +247,7 @@ func leaseTestRevoke(cx ctlCtx) error {
func ctlV3LeaseGrant(cx ctlCtx, ttl int) (string, error) { func ctlV3LeaseGrant(cx ctlCtx, ttl int) (string, error) {
cmdArgs := append(cx.PrefixArgs(), "lease", "grant", strconv.Itoa(ttl)) cmdArgs := append(cx.PrefixArgs(), "lease", "grant", strconv.Itoa(ttl))
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -271,7 +271,7 @@ func ctlV3LeaseGrant(cx ctlCtx, ttl int) (string, error) {
func ctlV3LeaseKeepAlive(cx ctlCtx, leaseID string) error { func ctlV3LeaseKeepAlive(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", leaseID) cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", leaseID)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return err return err
} }
@ -285,7 +285,7 @@ func ctlV3LeaseKeepAlive(cx ctlCtx, leaseID string) error {
func ctlV3LeaseKeepAliveOnce(cx ctlCtx, leaseID string) error { func ctlV3LeaseKeepAliveOnce(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", "--once", leaseID) cmdArgs := append(cx.PrefixArgs(), "lease", "keep-alive", "--once", leaseID)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return err return err
} }
@ -298,5 +298,5 @@ func ctlV3LeaseKeepAliveOnce(cx ctlCtx, leaseID string) error {
func ctlV3LeaseRevoke(cx ctlCtx, leaseID string) error { func ctlV3LeaseRevoke(cx ctlCtx, leaseID string) error {
cmdArgs := append(cx.PrefixArgs(), "lease", "revoke", leaseID) cmdArgs := append(cx.PrefixArgs(), "lease", "revoke", leaseID)
return spawnWithExpect(cmdArgs, fmt.Sprintf("lease %s revoked", leaseID)) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("lease %s revoked", leaseID))
} }

View File

@ -119,7 +119,7 @@ func testLockWithCmd(cx ctlCtx) {
// ctlV3Lock creates a lock process with a channel listening for when it acquires the lock. // ctlV3Lock creates a lock process with a channel listening for when it acquires the lock.
func ctlV3Lock(cx ctlCtx, name string) (*expect.ExpectProcess, <-chan string, error) { func ctlV3Lock(cx ctlCtx, name string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "lock", name) cmdArgs := append(cx.PrefixArgs(), "lock", name)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1) outc := make(chan string, 1)
if err != nil { if err != nil {
close(outc) close(outc)
@ -140,5 +140,5 @@ func ctlV3LockWithCmd(cx ctlCtx, execCmd []string, as ...string) error {
// use command as lock name // use command as lock name
cmdArgs := append(cx.PrefixArgs(), "lock", execCmd[0]) cmdArgs := append(cx.PrefixArgs(), "lock", execCmd[0])
cmdArgs = append(cmdArgs, execCmd...) cmdArgs = append(cmdArgs, execCmd...)
return spawnWithExpects(cmdArgs, as...) return spawnWithExpects(cmdArgs, cx.envMap, as...)
} }

View File

@ -83,7 +83,7 @@ func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvEx
cmdArgs := append(cx.PrefixArgs(), "make-mirror") cmdArgs := append(cx.PrefixArgs(), "make-mirror")
cmdArgs = append(cmdArgs, flags...) cmdArgs = append(cmdArgs, flags...)
cmdArgs = append(cmdArgs, fmt.Sprintf("localhost:%d", mirrorcfg.basePort)) cmdArgs = append(cmdArgs, fmt.Sprintf("localhost:%d", mirrorcfg.basePort))
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }

View File

@ -95,13 +95,13 @@ func ctlV3MemberList(cx ctlCtx) error {
for i := range lines { for i := range lines {
lines[i] = "started" lines[i] = "started"
} }
return spawnWithExpects(cmdArgs, lines...) return spawnWithExpects(cmdArgs, cx.envMap, lines...)
} }
func getMemberList(cx ctlCtx) (etcdserverpb.MemberListResponse, error) { func getMemberList(cx ctlCtx) (etcdserverpb.MemberListResponse, error) {
cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "member", "list") cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "member", "list")
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return etcdserverpb.MemberListResponse{}, err return etcdserverpb.MemberListResponse{}, err
} }
@ -130,7 +130,7 @@ func memberListWithHexTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "--hex", "member", "list") cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "--hex", "member", "list")
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
cx.t.Fatalf("memberListWithHexTest error (%v)", err) cx.t.Fatalf("memberListWithHexTest error (%v)", err)
} }
@ -177,7 +177,7 @@ func memberRemoveTest(cx ctlCtx) {
func ctlV3MemberRemove(cx ctlCtx, ep, memberID, clusterID string) error { func ctlV3MemberRemove(cx ctlCtx, ep, memberID, clusterID string) error {
cmdArgs := append(cx.prefixArgs([]string{ep}), "member", "remove", memberID) cmdArgs := append(cx.prefixArgs([]string{ep}), "member", "remove", memberID)
return spawnWithExpect(cmdArgs, fmt.Sprintf("%s removed from cluster %s", memberID, clusterID)) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("%s removed from cluster %s", memberID, clusterID))
} }
func memberAddTest(cx ctlCtx) { func memberAddTest(cx ctlCtx) {
@ -197,7 +197,7 @@ func ctlV3MemberAdd(cx ctlCtx, peerURL string, isLearner bool) error {
if isLearner { if isLearner {
cmdArgs = append(cmdArgs, "--learner") cmdArgs = append(cmdArgs, "--learner")
} }
return spawnWithExpect(cmdArgs, " added to cluster ") return spawnWithExpectWithEnv(cmdArgs, cx.envMap, " added to cluster ")
} }
func memberUpdateTest(cx ctlCtx) { func memberUpdateTest(cx ctlCtx) {
@ -215,5 +215,5 @@ func memberUpdateTest(cx ctlCtx) {
func ctlV3MemberUpdate(cx ctlCtx, memberID, peerURL string) error { func ctlV3MemberUpdate(cx ctlCtx, memberID, peerURL string) error {
cmdArgs := append(cx.PrefixArgs(), "member", "update", memberID, fmt.Sprintf("--peer-urls=%s", peerURL)) cmdArgs := append(cx.PrefixArgs(), "member", "update", memberID, fmt.Sprintf("--peer-urls=%s", peerURL))
return spawnWithExpect(cmdArgs, " updated in cluster ") return spawnWithExpectWithEnv(cmdArgs, cx.envMap, " updated in cluster ")
} }

View File

@ -97,21 +97,22 @@ func testCtlV3MoveLeader(t *testing.T, cfg etcdProcessClusterConfig) {
} }
tests := []struct { tests := []struct {
prefixes []string eps []string
expect string expect string
}{ }{
{ // request to non-leader { // request to non-leader
cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}), []string{cx.epc.EndpointsV3()[(leadIdx+1)%3]},
"no leader endpoint given at ", "no leader endpoint given at ",
}, },
{ // request to leader { // request to leader
cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}), []string{cx.epc.EndpointsV3()[leadIdx]},
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)), fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
}, },
} }
for i, tc := range tests { for i, tc := range tests {
cmdArgs := append(tc.prefixes, "move-leader", types.ID(transferee).String()) prefix := cx.prefixArgs(tc.eps)
if err := spawnWithExpect(cmdArgs, tc.expect); err != nil { cmdArgs := append(prefix, "move-leader", types.ID(transferee).String())
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, tc.expect); err != nil {
t.Fatalf("#%d: %v", i, err) t.Fatalf("#%d: %v", i, err)
} }
} }

View File

@ -96,7 +96,7 @@ func ctlV3Role(cx ctlCtx, args []string, expStr string) error {
cmdArgs := append(cx.PrefixArgs(), "role") cmdArgs := append(cx.PrefixArgs(), "role")
cmdArgs = append(cmdArgs, args...) cmdArgs = append(cmdArgs, args...)
return spawnWithExpect(cmdArgs, expStr) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, expStr)
} }
func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) error { func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) error {
@ -110,7 +110,7 @@ func ctlV3RoleGrantPermission(cx ctlCtx, rolename string, perm grantingPerm) err
cmdArgs = append(cmdArgs, rolename) cmdArgs = append(cmdArgs, rolename)
cmdArgs = append(cmdArgs, grantingPermToArgs(perm)...) cmdArgs = append(cmdArgs, grantingPermToArgs(perm)...)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return err return err
} }
@ -136,7 +136,7 @@ func ctlV3RoleRevokePermission(cx ctlCtx, rolename string, key, rangeEnd string,
expStr = fmt.Sprintf("Permission of key %s is revoked from role %s", key, rolename) expStr = fmt.Sprintf("Permission of key %s is revoked from role %s", key, rolename)
} }
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return err return err
} }

View File

@ -84,10 +84,11 @@ func snapshotCorruptTest(cx ctlCtx) {
datadir := cx.t.TempDir() datadir := cx.t.TempDir()
serr := spawnWithExpect( serr := spawnWithExpectWithEnv(
append(cx.PrefixArgsUtl(), "snapshot", "restore", append(cx.PrefixArgsUtl(), "snapshot", "restore",
"--data-dir", datadir, "--data-dir", datadir,
fpath), fpath),
cx.envMap,
"expected sha256") "expected sha256")
if serr != nil { if serr != nil {
@ -117,10 +118,11 @@ func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
dataDir := cx.t.TempDir() dataDir := cx.t.TempDir()
defer os.RemoveAll(dataDir) defer os.RemoveAll(dataDir)
serr := spawnWithExpect( serr := spawnWithExpectWithEnv(
append(cx.PrefixArgsUtl(), "snapshot", "restore", append(cx.PrefixArgsUtl(), "snapshot", "restore",
"--data-dir", dataDir, "--data-dir", dataDir,
fpath), fpath),
cx.envMap,
"added member") "added member")
if serr != nil { if serr != nil {
cx.t.Fatal(serr) cx.t.Fatal(serr)
@ -129,13 +131,13 @@ func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
func ctlV3SnapshotSave(cx ctlCtx, fpath string) error { func ctlV3SnapshotSave(cx ctlCtx, fpath string) error {
cmdArgs := append(cx.PrefixArgs(), "snapshot", "save", fpath) cmdArgs := append(cx.PrefixArgs(), "snapshot", "save", fpath)
return spawnWithExpect(cmdArgs, fmt.Sprintf("Snapshot saved at %s", fpath)) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, fmt.Sprintf("Snapshot saved at %s", fpath))
} }
func getSnapshotStatus(cx ctlCtx, fpath string) (snapshot.Status, error) { func getSnapshotStatus(cx ctlCtx, fpath string) (snapshot.Status, error) {
cmdArgs := append(cx.PrefixArgsUtl(), "--write-out", "json", "snapshot", "status", fpath) cmdArgs := append(cx.PrefixArgsUtl(), "--write-out", "json", "snapshot", "status", fpath)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return snapshot.Status{}, err return snapshot.Status{}, err
} }
@ -202,7 +204,10 @@ func testIssue6361(t *testing.T, etcdutl bool) {
fpath := filepath.Join(t.TempDir(), "test.snapshot") fpath := filepath.Join(t.TempDir(), "test.snapshot")
t.Log("etcdctl saving snapshot...") t.Log("etcdctl saving snapshot...")
if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil { if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
nil,
fmt.Sprintf("Snapshot saved at %s", fpath),
); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -262,7 +267,7 @@ func testIssue6361(t *testing.T, etcdutl bool) {
nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2, nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL, "--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL, "--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}) "--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -104,7 +104,7 @@ func clusterVersionTest(cx ctlCtx, expected string) {
func ctlV3Version(cx ctlCtx) error { func ctlV3Version(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "version") cmdArgs := append(cx.PrefixArgs(), "version")
return spawnWithExpect(cmdArgs, version.Version) return spawnWithExpectWithEnv(cmdArgs, cx.envMap, version.Version)
} }
// TestCtlV3DialWithHTTPScheme ensures that client handles endpoints with HTTPS scheme. // TestCtlV3DialWithHTTPScheme ensures that client handles endpoints with HTTPS scheme.
@ -114,7 +114,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) {
func dialWithSchemeTest(cx ctlCtx) { func dialWithSchemeTest(cx ctlCtx) {
cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar") cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar")
if err := spawnWithExpect(cmdArgs, "OK"); err != nil { if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "OK"); err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }
} }
@ -129,7 +129,7 @@ type ctlCtx struct {
epc *etcdProcessCluster epc *etcdProcessCluster
envMap map[string]struct{} envMap map[string]string
dialTimeout time.Duration dialTimeout time.Duration
@ -201,7 +201,7 @@ func withApiPrefix(p string) ctlOption {
} }
func withFlagByEnv() ctlOption { func withFlagByEnv() ctlOption {
return func(cx *ctlCtx) { cx.envMap = make(map[string]struct{}) } return func(cx *ctlCtx) { cx.envMap = make(map[string]string) }
} }
func withEtcdutl() ctlOption { func withEtcdutl() ctlOption {
@ -248,6 +248,7 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
for k := range ret.envMap { for k := range ret.envMap {
os.Unsetenv(k) os.Unsetenv(k)
} }
ret.envMap = make(map[string]string)
} }
if ret.epc != nil { if ret.epc != nil {
if errC := ret.epc.Close(); errC != nil { if errC := ret.epc.Close(); errC != nil {
@ -311,8 +312,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
for k, v := range fmap { for k, v := range fmap {
if useEnv { if useEnv {
ek := flags.FlagToEnv("ETCDCTL", k) ek := flags.FlagToEnv("ETCDCTL", k)
os.Setenv(ek, v) cx.envMap[ek] = v
cx.envMap[ek] = struct{}{}
} else { } else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v)) cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
} }

View File

@ -102,7 +102,7 @@ func ctlV3Txn(cx ctlCtx, rqs txnRequests) error {
if cx.interactive { if cx.interactive {
cmdArgs = append(cmdArgs, "--interactive") cmdArgs = append(cmdArgs, "--interactive")
} }
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return err return err
} }

View File

@ -179,7 +179,7 @@ func ctlV3User(cx ctlCtx, args []string, expStr string, stdIn []string) error {
cmdArgs := append(cx.PrefixArgs(), "user") cmdArgs := append(cx.PrefixArgs(), "user")
cmdArgs = append(cmdArgs, args...) cmdArgs = append(cmdArgs, args...)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
if err != nil { if err != nil {
return err return err
} }

View File

@ -35,7 +35,7 @@ func setupWatchArgs(cx ctlCtx, args []string) []string {
func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error { func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error {
cmdArgs := setupWatchArgs(cx, args) cmdArgs := setupWatchArgs(cx, args)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return err return err
} }
@ -66,7 +66,7 @@ func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error {
func ctlV3WatchFailPerm(cx ctlCtx, args []string) error { func ctlV3WatchFailPerm(cx ctlCtx, args []string) error {
cmdArgs := setupWatchArgs(cx, args) cmdArgs := setupWatchArgs(cx, args)
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -29,7 +29,7 @@ const exampleConfigFile = "../../etcd.conf.yml.sample"
func TestEtcdExampleConfig(t *testing.T) { func TestEtcdExampleConfig(t *testing.T) {
skipInShortMode(t) skipInShortMode(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--config-file", exampleConfigFile}) proc, err := spawnCmd([]string{binDir + "/etcd", "--config-file", exampleConfigFile}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -75,7 +75,7 @@ func TestEtcdMultiPeer(t *testing.T) {
"--initial-advertise-peer-urls", fmt.Sprintf("http://127.0.0.1:%d", etcdProcessBasePort+i), "--initial-advertise-peer-urls", fmt.Sprintf("http://127.0.0.1:%d", etcdProcessBasePort+i),
"--initial-cluster", ic, "--initial-cluster", ic,
} }
p, err := spawnCmd(args) p, err := spawnCmd(args, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -106,7 +106,7 @@ func TestEtcdUnixPeers(t *testing.T) {
"--listen-peer-urls", "unix://etcd.unix:1", "--listen-peer-urls", "unix://etcd.unix:1",
"--initial-advertise-peer-urls", "unix://etcd.unix:1", "--initial-advertise-peer-urls", "unix://etcd.unix:1",
"--initial-cluster", "e1=unix://etcd.unix:1", "--initial-cluster", "e1=unix://etcd.unix:1",
}, }, nil,
) )
defer os.Remove("etcd.unix:1") defer os.Remove("etcd.unix:1")
if err != nil { if err != nil {
@ -183,7 +183,7 @@ func TestEtcdPeerCNAuth(t *testing.T) {
commonArgs = append(commonArgs, args...) commonArgs = append(commonArgs, args...)
p, err := spawnCmd(commonArgs) p, err := spawnCmd(commonArgs, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -262,7 +262,7 @@ func TestEtcdPeerNameAuth(t *testing.T) {
commonArgs = append(commonArgs, args...) commonArgs = append(commonArgs, args...)
p, err := spawnCmd(commonArgs) p, err := spawnCmd(commonArgs, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -308,7 +308,7 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
t.Errorf("Unexpected error: %s", err) t.Errorf("Unexpected error: %s", err)
} }
p, err := spawnCmd(argsWithEmptyCN) p, err := spawnCmd(argsWithEmptyCN, nil)
defer func() { defer func() {
if p != nil { if p != nil {
p.Stop() p.Stop()
@ -323,7 +323,7 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
func TestBootstrapDefragFlag(t *testing.T) { func TestBootstrapDefragFlag(t *testing.T) {
skipInShortMode(t) skipInShortMode(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--experimental-bootstrap-defrag-threshold-megabytes", "1000"}) proc, err := spawnCmd([]string{binDir + "/etcd", "--experimental-bootstrap-defrag-threshold-megabytes", "1000"}, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -87,7 +87,7 @@ func corruptTest(cx ctlCtx) {
cx.t.Log("restarting etcd[0]") cx.t.Log("restarting etcd[0]")
ep := cx.epc.procs[0] ep := cx.epc.procs[0]
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...)) proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...), cx.envMap)
if err != nil { if err != nil {
cx.t.Fatal(err) cx.t.Fatal(err)
} }

View File

@ -43,6 +43,11 @@ type etcdProcess interface {
Close() error Close() error
WithStopSignal(sig os.Signal) os.Signal WithStopSignal(sig os.Signal) os.Signal
Config() *etcdServerProcessConfig Config() *etcdServerProcessConfig
Logs() logsExpect
}
type logsExpect interface {
Expect(string) (string, error)
} }
type etcdServerProcess struct { type etcdServerProcess struct {
@ -56,6 +61,7 @@ type etcdServerProcessConfig struct {
execPath string execPath string
args []string args []string
tlsArgs []string tlsArgs []string
envVars map[string]string
dataDirPath string dataDirPath string
keepDataDir bool keepDataDir bool
@ -92,7 +98,7 @@ func (ep *etcdServerProcess) Start() error {
panic("already started") panic("already started")
} }
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name)) ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.name))
proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...)) proc, err := spawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
if err != nil { if err != nil {
return err return err
} }
@ -163,3 +169,10 @@ func (ep *etcdServerProcess) waitReady() error {
} }
func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }
func (ep *etcdServerProcess) Logs() logsExpect {
if ep.proc == nil {
ep.cfg.lg.Panic("Please grap logs before process is stopped")
}
return ep.proc
}

View File

@ -18,6 +18,7 @@
package e2e package e2e
import ( import (
"fmt"
"os" "os"
"strings" "strings"
@ -27,20 +28,41 @@ import (
const noOutputLineCount = 0 // regular binaries emit no extra lines const noOutputLineCount = 0 // regular binaries emit no extra lines
func spawnCmd(args []string) (*expect.ExpectProcess, error) { func spawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
return spawnCmdWithLogger(zap.NewNop(), args) return spawnCmdWithLogger(zap.NewNop(), args, envVars)
} }
func spawnCmdWithLogger(lg *zap.Logger, args []string) (*expect.ExpectProcess, error) { func spawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
wd, err := os.Getwd() wd, err := os.Getwd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
env := mergeEnvVariables(envVars)
if strings.HasSuffix(args[0], "/etcdctl3") { if strings.HasSuffix(args[0], "/etcdctl3") {
env := append(os.Environ(), "ETCDCTL_API=3") env = append(env, "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd)) lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(ctlBinPath, args[1:], env) return expect.NewExpectWithEnv(ctlBinPath, args[1:], env)
} }
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd)) lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpect(args[0], args[1:]...) return expect.NewExpectWithEnv(args[0], args[1:], env)
}
func mergeEnvVariables(envVars map[string]string) []string {
var env []string
// Environment variables are passed as parameter have higher priority
// than os environment variables.
for k, v := range envVars {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
// Now, we can set os environment variables not passed as parameter.
currVars := os.Environ()
for _, v := range currVars {
p := strings.Split(v, "=")
if _, ok := envVars[p[0]]; !ok {
env = append(env, fmt.Sprintf("%s=%s", p[0], p[1]))
}
}
return env
} }

View File

@ -40,16 +40,20 @@ func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error
} }
func spawnWithExpect(args []string, expected string) error { func spawnWithExpect(args []string, expected string) error {
return spawnWithExpects(args, []string{expected}...) return spawnWithExpects(args, nil, []string{expected}...)
} }
func spawnWithExpects(args []string, xs ...string) error { func spawnWithExpectWithEnv(args []string, envVars map[string]string, expected string) error {
_, err := spawnWithExpectLines(args, xs...) return spawnWithExpects(args, envVars, []string{expected}...)
}
func spawnWithExpects(args []string, envVars map[string]string, xs ...string) error {
_, err := spawnWithExpectLines(args, envVars, xs...)
return err return err
} }
func spawnWithExpectLines(args []string, xs ...string) ([]string, error) { func spawnWithExpectLines(args []string, envVars map[string]string, xs ...string) ([]string, error) {
proc, err := spawnCmd(args) proc, err := spawnCmd(args, envVars)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -63,7 +63,7 @@ func assertVerifyCanStartV2deprecationNotYet(t testing.TB, dataDirPath string) {
func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) { func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) {
t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode") t.Log("Verify its infeasible to start etcd with --v2-deprecation=write-only mode")
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--data-dir=" + dataDirPath}) proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--data-dir=" + dataDirPath}, nil)
assert.NoError(t, err) assert.NoError(t, err)
_, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only") _, err = proc.Expect("detected disallowed custom content in v2store for stage --v2-deprecation=write-only")
@ -90,7 +90,7 @@ func TestV2Deprecation(t *testing.T) {
func TestV2DeprecationWriteOnlyNoV2Api(t *testing.T) { func TestV2DeprecationWriteOnlyNoV2Api(t *testing.T) {
BeforeTest(t) BeforeTest(t)
proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--enable-v2"}) proc, err := spawnCmd([]string{binDir + "/etcd", "--v2-deprecation=write-only", "--enable-v2"}, nil)
assert.NoError(t, err) assert.NoError(t, err)
_, err = proc.Expect("--enable-v2 and --v2-deprecation=write-only are mutually exclusive") _, err = proc.Expect("--enable-v2 and --v2-deprecation=write-only are mutually exclusive")

View File

@ -244,7 +244,7 @@ func testV3CurlAuth(cx ctlCtx) {
) )
cmdArgs = cURLPrefixArgs(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)}) cmdArgs = cURLPrefixArgs(cx.epc, "POST", cURLReq{endpoint: path.Join(p, "/auth/authenticate"), value: string(authreq)})
proc, err := spawnCmd(cmdArgs) proc, err := spawnCmd(cmdArgs, cx.envMap)
testutil.AssertNil(cx.t, err) testutil.AssertNil(cx.t, err)
defer proc.Close() defer proc.Close()
@ -286,7 +286,7 @@ func testV3CurlCampaign(cx ctlCtx) {
endpoint: path.Join(cx.apiPrefix, "/election/campaign"), endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
value: string(cdata), value: string(cdata),
}) })
lines, err := spawnWithExpectLines(cargs, `"leader":{"name":"`) lines, err := spawnWithExpectLines(cargs, cx.envMap, `"leader":{"name":"`)
if err != nil { if err != nil {
cx.t.Fatalf("failed post campaign request (%s) (%v)", cx.apiPrefix, err) cx.t.Fatalf("failed post campaign request (%s) (%v)", cx.apiPrefix, err)
} }

View File

@ -28,14 +28,14 @@ require (
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.0 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.0 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.0 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View File

@ -15,22 +15,22 @@
package integration package integration
import ( import (
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net" "net"
"sync" "sync"
"go.etcd.io/etcd/client/pkg/v3/transport"
) )
// bridge creates a unix socket bridge to another unix socket, making it possible type Dialer interface {
Dial() (net.Conn, error)
}
// bridge proxies connections between listener and dialer, making it possible
// to disconnect grpc network connections without closing the logical grpc connection. // to disconnect grpc network connections without closing the logical grpc connection.
type bridge struct { type bridge struct {
inaddr string dialer Dialer
outaddr string l net.Listener
l net.Listener conns map[*bridgeConn]struct{}
conns map[*bridgeConn]struct{}
stopc chan struct{} stopc chan struct{}
pausec chan struct{} pausec chan struct{}
@ -40,30 +40,22 @@ type bridge struct {
mu sync.Mutex mu sync.Mutex
} }
func newBridge(addr string) (*bridge, error) { func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) {
b := &bridge{ b := &bridge{
// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
inaddr: addr + "0", dialer: dialer,
outaddr: addr, l: listener,
conns: make(map[*bridgeConn]struct{}), conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
pausec: make(chan struct{}), pausec: make(chan struct{}),
blackholec: make(chan struct{}), blackholec: make(chan struct{}),
} }
close(b.pausec) close(b.pausec)
l, err := transport.NewUnixListener(b.inaddr)
if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
}
b.l = l
b.wg.Add(1) b.wg.Add(1)
go b.serveListen() go b.serveListen()
return b, nil return b, nil
} }
func (b *bridge) URL() string { return "unix://" + b.inaddr }
func (b *bridge) Close() { func (b *bridge) Close() {
b.l.Close() b.l.Close()
b.mu.Lock() b.mu.Lock()
@ -76,7 +68,7 @@ func (b *bridge) Close() {
b.wg.Wait() b.wg.Wait()
} }
func (b *bridge) Reset() { func (b *bridge) DropConnections() {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
for bc := range b.conns { for bc := range b.conns {
@ -85,13 +77,13 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{}) b.conns = make(map[*bridgeConn]struct{})
} }
func (b *bridge) Pause() { func (b *bridge) PauseConnections() {
b.mu.Lock() b.mu.Lock()
b.pausec = make(chan struct{}) b.pausec = make(chan struct{})
b.mu.Unlock() b.mu.Unlock()
} }
func (b *bridge) Unpause() { func (b *bridge) UnpauseConnections() {
b.mu.Lock() b.mu.Lock()
select { select {
case <-b.pausec: case <-b.pausec:
@ -127,7 +119,7 @@ func (b *bridge) serveListen() {
case <-pausec: case <-pausec:
} }
outc, oerr := net.Dial("unix", b.outaddr) outc, oerr := b.dialer.Dial()
if oerr != nil { if oerr != nil {
inc.Close() inc.Close()
return return

View File

@ -38,10 +38,11 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{ clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2, Size: 2,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
UseBridge: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
ccfg := clientv3.Config{ ccfg := clientv3.Config{
Endpoints: []string{eps[0]}, Endpoints: []string{eps[0]},
@ -76,7 +77,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// give enough time for balancer resolution // give enough time for balancer resolution
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
clus.Members[0].Blackhole() clus.Members[0].Bridge().Blackhole()
if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err) t.Fatal(err)
@ -87,12 +88,12 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
t.Error("took too long to receive watch events") t.Error("took too long to receive watch events")
} }
clus.Members[0].Unblackhole() clus.Members[0].Bridge().Unblackhole()
// waiting for moving eps[0] out of unhealthy, so that it can be re-pined. // waiting for moving eps[0] out of unhealthy, so that it can be re-pined.
time.Sleep(ccfg.DialTimeout) time.Sleep(ccfg.DialTimeout)
clus.Members[1].Blackhole() clus.Members[1].Bridge().Blackhole()
// make sure client[0] can connect to eps[0] after remove the blackhole. // make sure client[0] can connect to eps[0] after remove the blackhole.
if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil { if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil {
@ -170,10 +171,11 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
clus := integration.NewClusterV3(t, &integration.ClusterConfig{ clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2, Size: 2,
SkipCreatingClient: true, SkipCreatingClient: true,
UseBridge: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
ccfg := clientv3.Config{ ccfg := clientv3.Config{
Endpoints: []string{eps[0]}, Endpoints: []string{eps[0]},
@ -194,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
cli.SetEndpoints(eps...) cli.SetEndpoints(eps...)
// blackhole eps[0] // blackhole eps[0]
clus.Members[0].Blackhole() clus.Members[0].Bridge().Blackhole()
// With round robin balancer, client will make a request to a healthy endpoint // With round robin balancer, client will make a request to a healthy endpoint
// within a few requests. // within a few requests.

View File

@ -57,7 +57,7 @@ func TestDialTLSExpired(t *testing.T) {
} }
// expect remote errors "tls: bad certificate" // expect remote errors "tls: bad certificate"
_, err = integration.NewClient(t, clientv3.Config{ _, err = integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCURL()},
DialTimeout: 3 * time.Second, DialTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tls, TLS: tls,
@ -75,7 +75,7 @@ func TestDialTLSNoConfig(t *testing.T) {
defer clus.Terminate(t) defer clus.Terminate(t)
// expect "signed by unknown authority" // expect "signed by unknown authority"
c, err := integration.NewClient(t, clientv3.Config{ c, err := integration.NewClient(t, clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCURL()},
DialTimeout: time.Second, DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
}) })
@ -108,7 +108,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
// get endpoint list // get endpoint list
eps := make([]string, 3) eps := make([]string, 3)
for i := range eps { for i := range eps {
eps[i] = clus.Members[i].GRPCAddr() eps[i] = clus.Members[i].GRPCURL()
} }
toKill := rand.Intn(len(eps)) toKill := rand.Intn(len(eps))
@ -149,7 +149,7 @@ func TestSwitchSetEndpoints(t *testing.T) {
defer clus.Terminate(t) defer clus.Terminate(t)
// get non partitioned members endpoints // get non partitioned members endpoints
eps := []string{clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
cli := clus.Client(0) cli := clus.Client(0)
clus.Members[0].InjectPartition(t, clus.Members[1:]...) clus.Members[0].InjectPartition(t, clus.Members[1:]...)
@ -170,7 +170,7 @@ func TestRejectOldCluster(t *testing.T) {
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
RejectOldCluster: true, RejectOldCluster: true,
@ -212,7 +212,7 @@ func TestSetEndpointAndPut(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t) defer clus.Terminate(t)
clus.Client(1).SetEndpoints(clus.Members[0].GRPCAddr()) clus.Client(1).SetEndpoints(clus.Members[0].GRPCURL())
_, err := clus.Client(1).Put(context.TODO(), "foo", "bar") _, err := clus.Client(1).Put(context.TODO(), "foo", "bar")
if err != nil && !strings.Contains(err.Error(), "closing") { if err != nil && !strings.Contains(err.Error(), "closing") {
t.Fatal(err) t.Fatal(err)

View File

@ -112,7 +112,7 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// expect pin eps[0] // expect pin eps[0]
ccfg := clientv3.Config{ ccfg := clientv3.Config{
@ -168,7 +168,7 @@ func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T
SkipCreatingClient: true, SkipCreatingClient: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t) lead := clus.WaitLeader(t)
@ -224,7 +224,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
target := clus.WaitLeader(t) target := clus.WaitLeader(t)
if !isolateLeader { if !isolateLeader {
@ -285,7 +285,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
defer clus.Terminate(t) defer clus.Terminate(t)
leaderIndex := clus.WaitLeader(t) leaderIndex := clus.WaitLeader(t)
// get a follower endpoint // get a follower endpoint
eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()} eps := []string{clus.Members[(leaderIndex+1)%3].GRPCURL()}
ccfg := clientv3.Config{ ccfg := clientv3.Config{
Endpoints: eps, Endpoints: eps,
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
@ -303,7 +303,7 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
// add other endpoints for later endpoint switch // add other endpoints for later endpoint switch
cli.SetEndpoints(eps...) cli.SetEndpoints(eps...)
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr()) conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCURL())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -35,10 +35,11 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{ clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3, Size: 3,
SkipCreatingClient: true, SkipCreatingClient: true,
UseBridge: true,
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t) lead := clus.WaitLeader(t)
@ -150,7 +151,7 @@ func testBalancerUnderServerShutdownMutable(t *testing.T, op func(*clientv3.Clie
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// pin eps[0] // pin eps[0]
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
@ -208,7 +209,7 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
}) })
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
// pin eps[0] // pin eps[0]
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{eps[0]}})
@ -278,6 +279,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
cfg := &integration.ClusterConfig{ cfg := &integration.ClusterConfig{
Size: 2, Size: 2,
SkipCreatingClient: true, SkipCreatingClient: true,
UseBridge: true,
} }
if linearizable { if linearizable {
cfg.Size = 3 cfg.Size = 3
@ -285,9 +287,9 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
clus := integration.NewClusterV3(t, cfg) clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL()}
if linearizable { if linearizable {
eps = append(eps, clus.Members[2].GRPCAddr()) eps = append(eps, clus.Members[2].GRPCURL())
} }
lead := clus.WaitLeader(t) lead := clus.WaitLeader(t)

View File

@ -712,7 +712,7 @@ func TestKVGetRetry(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clusterSize := 3 clusterSize := 3
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
// because killing leader and following election // because killing leader and following election
@ -765,7 +765,7 @@ func TestKVGetRetry(t *testing.T) {
func TestKVPutFailGetRetry(t *testing.T) { func TestKVPutFailGetRetry(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
kv := clus.Client(0) kv := clus.Client(0)
@ -876,7 +876,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// in the presence of network errors. // in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) { func TestKVPutAtMostOnce(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil { if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {
@ -884,12 +884,12 @@ func TestKVPutAtMostOnce(t *testing.T) {
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
donec := make(chan struct{}) donec := make(chan struct{})
go func() { go func() {
defer close(donec) defer close(donec)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
}() }()
@ -1027,7 +1027,7 @@ func TestKVForLearner(t *testing.T) {
// 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members // 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members
// 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config, // 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config,
// because the implementation of integration test has diverged from embed/etcd.go. // because the implementation of integration test has diverged from embed/etcd.go.
learnerEp := clus.Members[3].GRPCAddr() learnerEp := clus.Members[3].GRPCURL()
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{learnerEp}, Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
@ -1100,7 +1100,7 @@ func TestBalancerSupportLearner(t *testing.T) {
} }
// clus.Members[3] is the newly added learner member, which was appended to clus.Members // clus.Members[3] is the newly added learner member, which was appended to clus.Members
learnerEp := clus.Members[3].GRPCAddr() learnerEp := clus.Members[3].GRPCURL()
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{learnerEp}, Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
@ -1119,7 +1119,7 @@ func TestBalancerSupportLearner(t *testing.T) {
t.Fatalf("expect Get request to learner to fail, got no error") t.Fatalf("expect Get request to learner to fail, got no error")
} }
eps := []string{learnerEp, clus.Members[0].GRPCAddr()} eps := []string{learnerEp, clus.Members[0].GRPCURL()}
cli.SetEndpoints(eps...) cli.SetEndpoints(eps...)
if _, err := cli.Get(context.Background(), "foo"); err != nil { if _, err := cli.Get(context.Background(), "foo"); err != nil {
t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err) t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err)

View File

@ -190,7 +190,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
// TODO: change this line to get a cluster client // TODO: change this line to get a cluster client
@ -416,7 +416,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -462,7 +462,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
func TestLeaseKeepAliveInitTimeout(t *testing.T) { func TestLeaseKeepAliveInitTimeout(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -495,7 +495,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) {
func TestLeaseKeepAliveTTLTimeout(t *testing.T) { func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -530,7 +530,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
func TestLeaseTimeToLive(t *testing.T) { func TestLeaseTimeToLive(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
c := clus.RandClient() c := clus.RandClient()
@ -656,7 +656,7 @@ func TestLeaseLeases(t *testing.T) {
func TestLeaseRenewLostQuorum(t *testing.T) { func TestLeaseRenewLostQuorum(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -728,7 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
// transient cluster failure. // transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) { func TestV3LeaseFailureOverlap(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
numReqs := 5 numReqs := 5
@ -782,7 +782,7 @@ func TestV3LeaseFailureOverlap(t *testing.T) {
func TestLeaseWithRequireLeader(t *testing.T) { func TestLeaseWithRequireLeader(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
c := clus.Client(0) c := clus.Client(0)

View File

@ -194,7 +194,7 @@ func TestLeasingPutInvalidateExisting(t *testing.T) {
// TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased. // TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased.
func TestLeasingGetNoLeaseTTL(t *testing.T) { func TestLeasingGetNoLeaseTTL(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -223,7 +223,7 @@ func TestLeasingGetNoLeaseTTL(t *testing.T) {
// when the etcd cluster is partitioned. // when the etcd cluster is partitioned.
func TestLeasingGetSerializable(t *testing.T) { func TestLeasingGetSerializable(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -325,7 +325,7 @@ func TestLeasingRevGet(t *testing.T) {
// TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server. // TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server.
func TestLeasingGetWithOpts(t *testing.T) { func TestLeasingGetWithOpts(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -417,7 +417,7 @@ func TestLeasingConcurrentPut(t *testing.T) {
func TestLeasingDisconnectedGet(t *testing.T) { func TestLeasingDisconnectedGet(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -549,7 +549,7 @@ func TestLeasingOverwriteResponse(t *testing.T) {
func TestLeasingOwnerPutResponse(t *testing.T) { func TestLeasingOwnerPutResponse(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -616,7 +616,7 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) {
func TestLeasingTxnOwnerGet(t *testing.T) { func TestLeasingTxnOwnerGet(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
client := clus.Client(0) client := clus.Client(0)
@ -772,7 +772,7 @@ func TestLeasingTxnOwnerDelete(t *testing.T) {
func TestLeasingTxnOwnerIf(t *testing.T) { func TestLeasingTxnOwnerIf(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -866,7 +866,7 @@ func TestLeasingTxnOwnerIf(t *testing.T) {
func TestLeasingTxnCancel(t *testing.T) { func TestLeasingTxnCancel(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/") lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1084,7 +1084,7 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) {
func TestLeasingOwnerPutError(t *testing.T) { func TestLeasingOwnerPutError(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1105,7 +1105,7 @@ func TestLeasingOwnerPutError(t *testing.T) {
func TestLeasingOwnerDeleteError(t *testing.T) { func TestLeasingOwnerDeleteError(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1126,7 +1126,7 @@ func TestLeasingOwnerDeleteError(t *testing.T) {
func TestLeasingNonOwnerPutError(t *testing.T) { func TestLeasingNonOwnerPutError(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1200,7 +1200,7 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) {
func TestLeasingDeleteRangeBounds(t *testing.T) { func TestLeasingDeleteRangeBounds(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/") delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/")
@ -1375,7 +1375,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) {
// disconnected when trying to submit revoke txn. // disconnected when trying to submit revoke txn.
func TestLeasingReconnectOwnerRevoke(t *testing.T) { func TestLeasingReconnectOwnerRevoke(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1436,7 +1436,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
// disconnected and the watch is compacted. // disconnected and the watch is compacted.
func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1489,7 +1489,7 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
// not cause inconsistency between the server and the client. // not cause inconsistency between the server and the client.
func TestLeasingReconnectOwnerConsistency(t *testing.T) { func TestLeasingReconnectOwnerConsistency(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1509,11 +1509,11 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
v := fmt.Sprintf("%d", i) v := fmt.Sprintf("%d", i)
donec := make(chan struct{}) donec := make(chan struct{})
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
go func() { go func() {
defer close(donec) defer close(donec)
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
}() }()
@ -1649,7 +1649,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
// TestLeasingReconnectTxn checks that Txn is resilient to disconnects. // TestLeasingReconnectTxn checks that Txn is resilient to disconnects.
func TestLeasingReconnectTxn(t *testing.T) { func TestLeasingReconnectTxn(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1663,9 +1663,9 @@ func TestLeasingReconnectTxn(t *testing.T) {
donec := make(chan struct{}) donec := make(chan struct{})
go func() { go func() {
defer close(donec) defer close(donec)
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -1685,7 +1685,7 @@ func TestLeasingReconnectTxn(t *testing.T) {
// not cause inconsistency between the server and the client. // not cause inconsistency between the server and the client.
func TestLeasingReconnectNonOwnerGet(t *testing.T) { func TestLeasingReconnectNonOwnerGet(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1703,11 +1703,11 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) {
n := 0 n := 0
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
donec := make(chan struct{}) donec := make(chan struct{})
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
go func() { go func() {
defer close(donec) defer close(donec)
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
}() }()
@ -1813,7 +1813,7 @@ func TestLeasingDo(t *testing.T) {
func TestLeasingTxnOwnerPutBranch(t *testing.T) { func TestLeasingTxnOwnerPutBranch(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1907,7 +1907,7 @@ func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, the
func TestLeasingSessionExpire(t *testing.T) { func TestLeasingSessionExpire(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
@ -1983,7 +1983,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {
for i := range tests { for i := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))

View File

@ -55,7 +55,7 @@ func TestMaintenanceHashKV(t *testing.T) {
if _, err := cli.Get(context.TODO(), "foo"); err != nil { if _, err := cli.Get(context.TODO(), "foo"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0) hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCURL(), 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -142,7 +142,7 @@ func TestMaintenanceSnapshotError(t *testing.T) {
func TestMaintenanceSnapshotErrorInflight(t *testing.T) { func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
// take about 1-second to read snapshot // take about 1-second to read snapshot
@ -206,7 +206,7 @@ func TestMaintenanceStatus(t *testing.T) {
eps := make([]string, 3) eps := make([]string, 3)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCAddr() eps[i] = clus.Members[i].GRPCURL()
} }
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) cli, err := integration.NewClient(t, clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})

View File

@ -75,7 +75,7 @@ func TestV3ClientMetrics(t *testing.T) {
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()}, Endpoints: []string{clus.Members[0].GRPCURL()},
DialOptions: []grpc.DialOption{ DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),

View File

@ -30,14 +30,14 @@ func TestDetectKvOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation") var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{ Endpoints: []string{
clus.Members[0].GRPCAddr(), clus.Members[0].GRPCURL(),
clus.Members[1].GRPCAddr(), clus.Members[1].GRPCURL(),
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCURL(),
}, },
} }
cli, err := integration.NewClient(t, cfg) cli, err := integration.NewClient(t, cfg)
@ -82,7 +82,7 @@ func TestDetectKvOrderViolation(t *testing.T) {
clus.Members[1].Stop(t) clus.Members[1].Stop(t)
assert.NoError(t, clus.Members[2].Restart(t)) assert.NoError(t, clus.Members[2].Restart(t))
// force OrderingKv to query the third member // force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr()) cli.SetEndpoints(clus.Members[2].GRPCURL())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
t.Logf("Quering m2 after restart") t.Logf("Quering m2 after restart")
@ -97,14 +97,14 @@ func TestDetectTxnOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation") var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{ Endpoints: []string{
clus.Members[0].GRPCAddr(), clus.Members[0].GRPCURL(),
clus.Members[1].GRPCAddr(), clus.Members[1].GRPCURL(),
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCURL(),
}, },
} }
cli, err := integration.NewClient(t, cfg) cli, err := integration.NewClient(t, cfg)
@ -151,7 +151,7 @@ func TestDetectTxnOrderViolation(t *testing.T) {
clus.Members[1].Stop(t) clus.Members[1].Stop(t)
assert.NoError(t, clus.Members[2].Restart(t)) assert.NoError(t, clus.Members[2].Restart(t))
// force OrderingKv to query the third member // force OrderingKv to query the third member
cli.SetEndpoints(clus.Members[2].GRPCAddr()) cli.SetEndpoints(clus.Members[2].GRPCURL())
time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != errOrderViolation { if err != errOrderViolation {

View File

@ -29,11 +29,11 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
eps := []string{ eps := []string{
clus.Members[0].GRPCAddr(), clus.Members[0].GRPCURL(),
clus.Members[1].GRPCAddr(), clus.Members[1].GRPCURL(),
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCURL(),
} }
cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}} cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}
cli, err := integration.NewClient(t, cfg) cli, err := integration.NewClient(t, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -71,7 +71,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
} }
t.Logf("Reconfigure client to speak only to the 'partitioned' member") t.Logf("Reconfigure client to speak only to the 'partitioned' member")
cli.SetEndpoints(clus.Members[2].GRPCAddr()) cli.SetEndpoints(clus.Members[2].GRPCURL())
_, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != ordering.ErrNoGreaterRev { if err != ordering.ErrNoGreaterRev {
t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error") t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error")
@ -80,15 +80,15 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
func TestUnresolvableOrderViolation(t *testing.T) { func TestUnresolvableOrderViolation(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{ Endpoints: []string{
clus.Members[0].GRPCAddr(), clus.Members[0].GRPCURL(),
clus.Members[1].GRPCAddr(), clus.Members[1].GRPCURL(),
clus.Members[2].GRPCAddr(), clus.Members[2].GRPCURL(),
clus.Members[3].GRPCAddr(), clus.Members[3].GRPCURL(),
clus.Members[4].GRPCAddr(), clus.Members[4].GRPCURL(),
}, },
} }
cli, err := integration.NewClient(t, cfg) cli, err := integration.NewClient(t, cfg)
@ -99,7 +99,7 @@ func TestUnresolvableOrderViolation(t *testing.T) {
eps := cli.Endpoints() eps := cli.Endpoints()
ctx := context.TODO() ctx := context.TODO()
cli.SetEndpoints(clus.Members[0].GRPCAddr()) cli.SetEndpoints(clus.Members[0].GRPCURL())
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
_, err = cli.Put(ctx, "foo", "bar") _, err = cli.Put(ctx, "foo", "bar")
if err != nil { if err != nil {
@ -139,7 +139,7 @@ func TestUnresolvableOrderViolation(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
clus.Members[3].WaitStarted(t) clus.Members[3].WaitStarted(t)
cli.SetEndpoints(clus.Members[3].GRPCAddr()) cli.SetEndpoints(clus.Members[3].GRPCURL())
_, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable()) _, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable())
if err != ordering.ErrNoGreaterRev { if err != ordering.ErrNoGreaterRev {

View File

@ -53,7 +53,7 @@ func TestTxnError(t *testing.T) {
func TestTxnWriteFail(t *testing.T) { func TestTxnWriteFail(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
kv := clus.Client(0) kv := clus.Client(0)
@ -103,7 +103,7 @@ func TestTxnReadRetry(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
kv := clus.Client(0) kv := clus.Client(0)

View File

@ -47,7 +47,7 @@ type watchctx struct {
func runWatchTest(t *testing.T, f watcherTest) { func runWatchTest(t *testing.T, f watcherTest) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
wclientMember := rand.Intn(3) wclientMember := rand.Intn(3)
@ -188,7 +188,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
defer close(donec) defer close(donec)
// take down watcher connection // take down watcher connection
for { for {
wctx.clus.Members[wctx.wclientMember].DropConnections() wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
select { select {
case <-timer: case <-timer:
// spinning on close may live lock reconnection // spinning on close may live lock reconnection
@ -230,7 +230,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel") t.Fatalf("expected non-nil channel")
} }
wctx.clus.Members[wctx.wclientMember].DropConnections() wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover // watcher should recover
putAndWatch(t, wctx, "a", "a") putAndWatch(t, wctx, "a", "a")
} }
@ -247,7 +247,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
} }
putAndWatch(t, wctx, "a", "a") putAndWatch(t, wctx, "a", "a")
// take down watcher connection // take down watcher connection
wctx.clus.Members[wctx.wclientMember].DropConnections() wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover // watcher should recover
putAndWatch(t, wctx, "a", "b") putAndWatch(t, wctx, "a", "b")
} }
@ -348,7 +348,7 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
func TestWatchResumeInitRev(t *testing.T) { func TestWatchResumeInitRev(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -368,8 +368,8 @@ func TestWatchResumeInitRev(t *testing.T) {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok) t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
} }
// pause wch // pause wch
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
clus.Members[0].PauseConnections() clus.Members[0].Bridge().PauseConnections()
select { select {
case resp, ok := <-wch: case resp, ok := <-wch:
@ -378,7 +378,7 @@ func TestWatchResumeInitRev(t *testing.T) {
} }
// resume wch // resume wch
clus.Members[0].UnpauseConnections() clus.Members[0].Bridge().UnpauseConnections()
select { select {
case resp, ok := <-wch: case resp, ok := <-wch:
@ -404,7 +404,7 @@ func TestWatchResumeInitRev(t *testing.T) {
func TestWatchResumeCompacted(t *testing.T) { func TestWatchResumeCompacted(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
// create a waiting watcher at rev 1 // create a waiting watcher at rev 1
@ -955,7 +955,7 @@ func TestWatchWithCreatedNotification(t *testing.T) {
func TestWatchWithCreatedNotificationDropConn(t *testing.T) { func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer cluster.Terminate(t) defer cluster.Terminate(t)
client := cluster.RandClient() client := cluster.RandClient()
@ -968,7 +968,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
t.Fatalf("expected created event, got %v", resp) t.Fatalf("expected created event, got %v", resp)
} }
cluster.Members[0].DropConnections() cluster.Members[0].Bridge().DropConnections()
// check watch channel doesn't post another watch response. // check watch channel doesn't post another watch response.
select { select {
@ -1056,14 +1056,14 @@ func TestWatchOverlapContextCancel(t *testing.T) {
func TestWatchOverlapDropConnContextCancel(t *testing.T) { func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) { f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
} }
testWatchOverlapContextCancel(t, f) testWatchOverlapContextCancel(t, f)
} }
func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) { func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
n := 100 n := 100
@ -1154,7 +1154,7 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
// then closes the watcher interface to ensure correct clean up. // then closes the watcher interface to ensure correct clean up.
func TestWatchStressResumeClose(t *testing.T) { func TestWatchStressResumeClose(t *testing.T) {
integration.BeforeTest(t) integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t) defer clus.Terminate(t)
cli := clus.Client(0) cli := clus.Client(0)
@ -1164,7 +1164,7 @@ func TestWatchStressResumeClose(t *testing.T) {
for i := range wchs { for i := range wchs {
wchs[i] = cli.Watch(ctx, "abc") wchs[i] = cli.Watch(ctx, "abc")
} }
clus.Members[0].DropConnections() clus.Members[0].Bridge().DropConnections()
cancel() cancel()
if err := cli.Close(); err != nil { if err := cli.Close(); err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -39,6 +39,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v2" "go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/embed"
@ -73,6 +74,7 @@ const (
basePort = 21000 basePort = 21000
URLScheme = "unix" URLScheme = "unix"
URLSchemeTLS = "unixs" URLSchemeTLS = "unixs"
baseGRPCPort = 30000
) )
var ( var (
@ -121,6 +123,10 @@ var (
defaultTokenJWT = fmt.Sprintf("jwt,pub-key=%s,priv-key=%s,sign-method=RS256,ttl=1s", defaultTokenJWT = fmt.Sprintf("jwt,pub-key=%s,priv-key=%s,sign-method=RS256,ttl=1s",
MustAbsPath("../fixtures/server.crt"), MustAbsPath("../fixtures/server.key.insecure")) MustAbsPath("../fixtures/server.crt"), MustAbsPath("../fixtures/server.key.insecure"))
// uniqueNumber is used to generate unique port numbers
// Should only be accessed via atomic package methods.
uniqueNumber int32
) )
type ClusterConfig struct { type ClusterConfig struct {
@ -153,9 +159,15 @@ type ClusterConfig struct {
// UseIP is true to use only IP for gRPC requests. // UseIP is true to use only IP for gRPC requests.
UseIP bool UseIP bool
// UseBridge adds bridge between client and grpc server. Should be used in tests that
// want to manipulate connection or require connection not breaking despite server stop/restart.
UseBridge bool
// UseTCP configures server listen on tcp socket. If disabled unix socket is used.
UseTCP bool
EnableLeaseCheckpoint bool EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration WatchProgressNotifyInterval time.Duration
} }
@ -208,7 +220,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster {
c := &cluster{cfg: cfg} c := &cluster{cfg: cfg}
ms := make([]*member, cfg.Size) ms := make([]*member, cfg.Size)
for i := 0; i < cfg.Size; i++ { for i := 0; i < cfg.Size; i++ {
ms[i] = c.mustNewMember(t) ms[i] = c.mustNewMember(t, int64(i))
} }
c.Members = ms c.Members = ms
if err := c.fillClusterForMembers(); err != nil { if err := c.fillClusterForMembers(); err != nil {
@ -249,7 +261,7 @@ func (c *cluster) Launch(t testutil.TB) {
c.waitMembersMatch(t, c.HTTPMembers()) c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion() c.waitVersion()
for _, m := range c.Members { for _, m := range c.Members {
t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCAddr()) t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCURL())
} }
} }
@ -295,10 +307,11 @@ func (c *cluster) HTTPMembers() []client.Member {
return ms return ms
} }
func (c *cluster) mustNewMember(t testutil.TB) *member { func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
m := mustNewMember(t, m := mustNewMember(t,
memberConfig{ memberConfig{
name: c.generateMemberName(), name: c.generateMemberName(),
memberNumber: memberNumber,
authToken: c.cfg.AuthToken, authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS, peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS, clientTLS: c.cfg.ClientTLS,
@ -313,7 +326,10 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP, useIP: c.cfg.UseIP,
useBridge: c.cfg.UseBridge,
useTCP: c.cfg.UseTCP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
}) })
@ -328,7 +344,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
// addMember return PeerURLs of the added member. // addMember return PeerURLs of the added member.
func (c *cluster) addMember(t testutil.TB) types.URLs { func (c *cluster) addMember(t testutil.TB) types.URLs {
m := c.mustNewMember(t) m := c.mustNewMember(t, 0)
scheme := schemeFromTLSInfo(c.cfg.PeerTLS) scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
@ -557,6 +573,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
type member struct { type member struct {
config.ServerConfig config.ServerConfig
UniqNumber int64
MemberNumber int64
PeerListeners, ClientListeners []net.Listener PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener grpcListener net.Listener
// PeerTLSInfo enables peer TLS when set // PeerTLSInfo enables peer TLS when set
@ -572,7 +590,7 @@ type member struct {
grpcServerOpts []grpc.ServerOption grpcServerOpts []grpc.ServerOption
grpcServer *grpc.Server grpcServer *grpc.Server
grpcServerPeer *grpc.Server grpcServerPeer *grpc.Server
grpcAddr string grpcURL string
grpcBridge *bridge grpcBridge *bridge
// serverClient is a clientv3 that directly calls the etcdserver. // serverClient is a clientv3 that directly calls the etcdserver.
@ -582,15 +600,21 @@ type member struct {
clientMaxCallSendMsgSize int clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int clientMaxCallRecvMsgSize int
useIP bool useIP bool
useBridge bool
useTCP bool
isLearner bool isLearner bool
closed bool closed bool
grpcServerRecorder *grpc_testing.GrpcRecorder
} }
func (m *member) GRPCAddr() string { return m.grpcAddr } func (m *member) GRPCURL() string { return m.grpcURL }
type memberConfig struct { type memberConfig struct {
name string name string
uniqNumber int64
memberNumber int64
peerTLS *transport.TLSInfo peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo clientTLS *transport.TLSInfo
authToken string authToken string
@ -605,8 +629,11 @@ type memberConfig struct {
clientMaxCallSendMsgSize int clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int clientMaxCallRecvMsgSize int
useIP bool useIP bool
useBridge bool
useTCP bool
enableLeaseCheckpoint bool enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration WatchProgressNotifyInterval time.Duration
} }
@ -614,7 +641,10 @@ type memberConfig struct {
// set, it will use https scheme to communicate between peers. // set, it will use https scheme to communicate between peers.
func mustNewMember(t testutil.TB, mcfg memberConfig) *member { func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
var err error var err error
m := &member{} m := &member{
MemberNumber: mcfg.memberNumber,
UniqNumber: atomic.AddInt64(&localListenCount, 1),
}
peerScheme := schemeFromTLSInfo(mcfg.peerTLS) peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
clientScheme := schemeFromTLSInfo(mcfg.clientTLS) clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
@ -698,8 +728,11 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.useIP = mcfg.useIP m.useIP = mcfg.useIP
m.useBridge = mcfg.useBridge
m.useTCP = mcfg.useTCP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
@ -707,7 +740,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.WarningApplyDuration = embed.DefaultWarningApplyDuration m.WarningApplyDuration = embed.DefaultWarningApplyDuration
m.V2Deprecation = config.V2_DEPR_DEFAULT m.V2Deprecation = config.V2_DEPR_DEFAULT
m.grpcServerRecorder = &grpc_testing.GrpcRecorder{}
m.Logger = memberLogger(t, mcfg.name) m.Logger = memberLogger(t, mcfg.name)
t.Cleanup(func() { t.Cleanup(func() {
// if we didn't cleanup the logger, the consecutive test // if we didn't cleanup the logger, the consecutive test
@ -730,45 +763,109 @@ func memberLogger(t testutil.TB, name string) *zap.Logger {
// listenGRPC starts a grpc server over a unix domain socket on the member // listenGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error { func (m *member) listenGRPC() error {
// prefix with localhost so cert has right domain // prefix with localhost so cert has right domain
m.grpcAddr = "localhost:" + m.Name network, host, port := m.grpcAddr()
m.Logger.Info("LISTEN GRPC", zap.String("m.grpcAddr", m.grpcAddr), zap.String("m.Name", m.Name)) grpcAddr := host + ":" + port
if m.useIP { // for IP-only TLS certs m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name))
m.grpcAddr = "127.0.0.1:" + m.Name grpcListener, err := net.Listen(network, grpcAddr)
}
l, err := transport.NewUnixListener(m.grpcAddr)
if err != nil { if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
} }
m.grpcBridge, err = newBridge(m.grpcAddr) m.grpcURL = fmt.Sprintf("%s://%s", m.clientScheme(), grpcAddr)
if err != nil { if m.useBridge {
l.Close() _, err = m.addBridge()
return err if err != nil {
grpcListener.Close()
return err
}
} }
m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr m.grpcListener = grpcListener
m.grpcListener = l
return nil return nil
} }
func (m *member) clientScheme() string {
switch {
case m.useTCP && m.ClientTLSInfo != nil:
return "https"
case m.useTCP && m.ClientTLSInfo == nil:
return "http"
case !m.useTCP && m.ClientTLSInfo != nil:
return "unixs"
case !m.useTCP && m.ClientTLSInfo == nil:
return "unix"
}
m.Logger.Panic("Failed to determine client schema")
return ""
}
func (m *member) addBridge() (*bridge, error) {
network, host, port := m.grpcAddr()
grpcAddr := host + ":" + port
bridgeAddr := grpcAddr + "0"
m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name))
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
if err != nil {
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", bridgeAddr, err)
}
m.grpcBridge, err = newBridge(dialer{network: network, addr: grpcAddr}, bridgeListener)
if err != nil {
bridgeListener.Close()
return nil, err
}
m.grpcURL = m.clientScheme() + "://" + bridgeAddr
return m.grpcBridge, nil
}
func (m *member) Bridge() *bridge {
if !m.useBridge {
m.Logger.Panic("Bridge not available. Please configure using bridge before creating cluster.")
}
return m.grpcBridge
}
func (m *member) grpcAddr() (network, host, port string) {
// prefix with localhost so cert has right domain
host = "localhost"
if m.useIP { // for IP-only TLS certs
host = "127.0.0.1"
}
network = "unix"
if m.useTCP {
network = "tcp"
}
port = m.Name
if m.useTCP {
port = fmt.Sprintf("%d", GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
return network, host, port
}
func GrpcPortNumber(uniqNumber, memberNumber int64) int64 {
return baseGRPCPort + uniqNumber*10 + memberNumber
}
type dialer struct {
network string
addr string
}
func (d dialer) Dial() (net.Conn, error) {
return net.Dial(d.network, d.addr)
}
func (m *member) ElectionTimeout() time.Duration { func (m *member) ElectionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
} }
func (m *member) ID() types.ID { return m.s.ID() } func (m *member) ID() types.ID { return m.s.ID() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
// NewClientV3 creates a new grpc client connection to the member // NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) { func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcAddr == "" { if m.grpcURL == "" {
return nil, fmt.Errorf("member not configured for grpc") return nil, fmt.Errorf("member not configured for grpc")
} }
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: []string{m.grpcAddr}, Endpoints: []string{m.grpcURL},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, DialOptions: []grpc.DialOption{grpc.WithBlock()},
MaxCallSendMsgSize: m.clientMaxCallSendMsgSize, MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
@ -829,7 +926,7 @@ func (m *member) Launch() error {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
var err error var err error
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil { if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
@ -855,8 +952,8 @@ func (m *member) Launch() error {
return err return err
} }
} }
m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerRecorder.UnaryInterceptor(), m.grpcServerOpts...)
m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg, m.grpcServerRecorder.UnaryInterceptor())
m.serverClient = v3client.New(m.s) m.serverClient = v3client.New(m.s)
lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
@ -986,11 +1083,15 @@ func (m *member) Launch() error {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
return nil return nil
} }
func (m *member) RecordedRequests() []grpc_testing.RequestInfo {
return m.grpcServerRecorder.RecordedRequests()
}
func (m *member) WaitOK(t testutil.TB) { func (m *member) WaitOK(t testutil.TB) {
m.WaitStarted(t) m.WaitStarted(t)
for m.s.Leader() == 0 { for m.s.Leader() == 0 {
@ -1099,7 +1200,7 @@ func (m *member) Stop(_ testutil.TB) {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
m.Close() m.Close()
m.serverClosers = nil m.serverClosers = nil
@ -1108,7 +1209,7 @@ func (m *member) Stop(_ testutil.TB) {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
} }
@ -1133,7 +1234,7 @@ func (m *member) Restart(t testutil.TB) error {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
newPeerListeners := make([]net.Listener, 0) newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners { for _, ln := range m.PeerListeners {
@ -1158,7 +1259,7 @@ func (m *member) Restart(t testutil.TB) error {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
zap.Error(err), zap.Error(err),
) )
return err return err
@ -1171,7 +1272,7 @@ func (m *member) Terminate(t testutil.TB) {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
m.Close() m.Close()
if !m.keepDataDirTerminate { if !m.keepDataDirTerminate {
@ -1184,7 +1285,7 @@ func (m *member) Terminate(t testutil.TB) {
zap.String("name", m.Name), zap.String("name", m.Name),
zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
zap.String("grpc-address", m.grpcAddr), zap.String("grpc-url", m.grpcURL),
) )
} }
@ -1280,8 +1381,9 @@ func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i]
type ClusterV3 struct { type ClusterV3 struct {
*cluster *cluster
mu sync.Mutex mu sync.Mutex
clients []*clientv3.Client clients []*clientv3.Client
clusterClient *clientv3.Client
} }
// NewClusterV3 returns a launched cluster with a grpc client connection // NewClusterV3 returns a launched cluster with a grpc client connection
@ -1327,6 +1429,11 @@ func (c *ClusterV3) Terminate(t testutil.TB) {
t.Error(err) t.Error(err)
} }
} }
if c.clusterClient != nil {
if err := c.clusterClient.Close(); err != nil {
t.Error(err)
}
}
c.mu.Unlock() c.mu.Unlock()
c.cluster.Terminate(t) c.cluster.Terminate(t)
} }
@ -1339,6 +1446,25 @@ func (c *ClusterV3) Client(i int) *clientv3.Client {
return c.clients[i] return c.clients[i]
} }
func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {
if c.clusterClient == nil {
endpoints := []string{}
for _, m := range c.Members {
endpoints = append(endpoints, m.grpcURL)
}
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
c.clusterClient, err = newClientV3(cfg, cfg.Logger)
if err != nil {
return nil, err
}
}
return c.clusterClient, nil
}
// NewClientV3 creates a new grpc client connection to the member // NewClientV3 creates a new grpc client connection to the member
func (c *ClusterV3) NewClientV3(memberIndex int) (*clientv3.Client, error) { func (c *ClusterV3) NewClientV3(memberIndex int) (*clientv3.Client, error) {
return NewClientV3(c.Members[memberIndex]) return NewClientV3(c.Members[memberIndex])
@ -1418,7 +1544,7 @@ func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster // AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
// via v3 MemberAdd API, and then launches the new member. // via v3 MemberAdd API, and then launches the new member.
func (c *ClusterV3) AddAndLaunchLearnerMember(t testutil.TB) { func (c *ClusterV3) AddAndLaunchLearnerMember(t testutil.TB) {
m := c.mustNewMember(t) m := c.mustNewMember(t, 0)
m.isLearner = true m.isLearner = true
scheme := schemeFromTLSInfo(c.cfg.PeerTLS) scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
@ -1519,7 +1645,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j],
// MustNewMember creates a new member instance based on the response of V3 Member Add API. // MustNewMember creates a new member instance based on the response of V3 Member Add API.
func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member { func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member {
m := c.mustNewMember(t) m := c.mustNewMember(t, 0)
m.isLearner = resp.Member.IsLearner m.isLearner = resp.Member.IsLearner
m.NewCluster = false m.NewCluster = false

View File

@ -173,7 +173,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
} }
func TestForceNewCluster(t *testing.T) { func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3) c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t) c.Launch(t)
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
@ -283,7 +283,7 @@ func testIssue2746(t *testing.T, members int) {
func TestIssue2904(t *testing.T) { func TestIssue2904(t *testing.T) {
BeforeTest(t) BeforeTest(t)
// start 1-member cluster to ensure member 0 is the leader of the cluster. // start 1-member cluster to ensure member 0 is the leader of the cluster.
c := NewCluster(t, 1) c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
c.Launch(t) c.Launch(t)
defer c.Terminate(t) defer c.Terminate(t)
@ -319,7 +319,7 @@ func TestIssue2904(t *testing.T) {
func TestIssue3699(t *testing.T) { func TestIssue3699(t *testing.T) {
// start a cluster of 3 nodes a, b, c // start a cluster of 3 nodes a, b, c
BeforeTest(t) BeforeTest(t)
c := NewCluster(t, 3) c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t) c.Launch(t)
defer c.Terminate(t) defer c.Terminate(t)
@ -371,7 +371,7 @@ func TestIssue3699(t *testing.T) {
// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. // TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
func TestRejectUnhealthyAdd(t *testing.T) { func TestRejectUnhealthyAdd(t *testing.T) {
BeforeTest(t) BeforeTest(t)
c := NewCluster(t, 3) c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
for _, m := range c.Members { for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true m.ServerConfig.StrictReconfigCheck = true
} }
@ -415,7 +415,7 @@ func TestRejectUnhealthyAdd(t *testing.T) {
// if quorum will be lost. // if quorum will be lost.
func TestRejectUnhealthyRemove(t *testing.T) { func TestRejectUnhealthyRemove(t *testing.T) {
BeforeTest(t) BeforeTest(t)
c := NewCluster(t, 5) c := newCluster(t, &ClusterConfig{Size: 5, UseBridge: true})
for _, m := range c.Members { for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true m.ServerConfig.StrictReconfigCheck = true
} }
@ -464,7 +464,7 @@ func TestRestartRemoved(t *testing.T) {
BeforeTest(t) BeforeTest(t)
// 1. start single-member cluster // 1. start single-member cluster
c := NewCluster(t, 1) c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
for _, m := range c.Members { for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true m.ServerConfig.StrictReconfigCheck = true
} }
@ -540,7 +540,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
func TestSpeedyTerminate(t *testing.T) { func TestSpeedyTerminate(t *testing.T) {
BeforeTest(t) BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3}) clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
// Stop/Restart so requests will time out on lost leaders // Stop/Restart so requests will time out on lost leaders
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
clus.Members[i].Stop(t) clus.Members[i].Stop(t)

View File

@ -0,0 +1,197 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
tls "crypto/tls"
"fmt"
"strings"
"testing"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
func TestAuthority(t *testing.T) {
tcs := []struct {
name string
useTCP bool
useTLS bool
// Pattern used to generate endpoints for client. Fields filled
// %d - will be filled with member grpc port
// %s - will be filled with member name
clientURLPattern string
// Pattern used to validate authority received by server. Fields filled:
// %d - will be filled with first member grpc port
// %s - will be filled with first member name
expectAuthorityPattern string
}{
{
name: "unix:path",
clientURLPattern: "unix:localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "unix://absolute_path",
clientURLPattern: "unix://localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
// "unixs" is not standard schema supported by etcd
{
name: "unixs:absolute_path",
useTLS: true,
clientURLPattern: "unixs:localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "unixs://absolute_path",
useTLS: true,
clientURLPattern: "unixs://localhost:%s",
expectAuthorityPattern: "localhost:%s",
},
{
name: "http://domain[:port]",
useTCP: true,
clientURLPattern: "http://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://domain[:port]",
useTLS: true,
useTCP: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "http://address[:port]",
useTCP: true,
clientURLPattern: "http://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://address[:port]",
useTCP: true,
useTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
}
for _, tc := range tcs {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
BeforeTest(t)
cfg := ClusterConfig{
Size: clusterSize,
UseTCP: tc.useTCP,
UseIP: tc.useTCP,
}
cfg, tlsConfig := setupTLS(t, tc.useTLS, cfg)
clus := NewClusterV3(t, &cfg)
defer clus.Terminate(t)
kv := setupClient(t, tc.clientURLPattern, clus, tlsConfig)
defer kv.Close()
_, err := kv.Put(context.TODO(), "foo", "bar")
if err != nil {
t.Fatal(err)
}
assertAuthority(t, templateAuthority(t, tc.expectAuthorityPattern, clus.Members[0]), clus)
})
}
}
}
func setupTLS(t *testing.T, useTLS bool, cfg ClusterConfig) (ClusterConfig, *tls.Config) {
t.Helper()
if useTLS {
cfg.ClientTLS = &testTLSInfo
tlsConfig, err := testTLSInfo.ClientConfig()
if err != nil {
t.Fatal(err)
}
return cfg, tlsConfig
}
return cfg, nil
}
func setupClient(t *testing.T, endpointPattern string, clus *ClusterV3, tlsConfig *tls.Config) *clientv3.Client {
t.Helper()
endpoints := templateEndpoints(t, endpointPattern, clus)
kv, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tlsConfig,
})
if err != nil {
t.Fatal(err)
}
return kv
}
func templateEndpoints(t *testing.T, pattern string, clus *ClusterV3) []string {
t.Helper()
endpoints := []string{}
for _, m := range clus.Members {
ent := pattern
if strings.Contains(ent, "%d") {
ent = fmt.Sprintf(ent, GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
if strings.Contains(ent, "%s") {
ent = fmt.Sprintf(ent, m.Name)
}
if strings.Contains(ent, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
}
endpoints = append(endpoints, ent)
}
return endpoints
}
func templateAuthority(t *testing.T, pattern string, m *member) string {
t.Helper()
authority := pattern
if strings.Contains(authority, "%d") {
authority = fmt.Sprintf(authority, GrpcPortNumber(m.UniqNumber, m.MemberNumber))
}
if strings.Contains(authority, "%s") {
authority = fmt.Sprintf(authority, m.Name)
}
if strings.Contains(authority, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", authority)
}
return authority
}
func assertAuthority(t *testing.T, expectedAuthority string, clus *ClusterV3) {
t.Helper()
requestsFound := 0
for _, m := range clus.Members {
for _, r := range m.RecordedRequests() {
requestsFound++
if r.Authority != expectedAuthority {
t.Errorf("Got unexpected authority header, member: %q, request: %q, got authority: %q, expected %q", m.Name, r.FullMethod, r.Authority, expectedAuthority)
}
}
}
if requestsFound == 0 {
t.Errorf("Expected at least one request")
}
}

View File

@ -46,7 +46,7 @@ func TestPauseMember(t *testing.T) {
func TestRestartMember(t *testing.T) { func TestRestartMember(t *testing.T) {
BeforeTest(t) BeforeTest(t)
c := NewCluster(t, 3) c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t) c.Launch(t)
defer c.Terminate(t) defer c.Terminate(t)
@ -88,7 +88,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
func TestSnapshotAndRestartMember(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) {
BeforeTest(t) BeforeTest(t)
m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"}) m := mustNewMember(t, memberConfig{name: "snapAndRestartTest", useBridge: true})
m.SnapshotCount = 100 m.SnapshotCount = 100
m.Launch() m.Launch()
defer m.Terminate(t) defer m.Terminate(t)

View File

@ -36,7 +36,7 @@ func TestClusterProxyMemberList(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t) defer clus.Terminate(t)
cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCAddr()}, t) cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t)
defer cts.close(t) defer cts.close(t)
cfg := clientv3.Config{ cfg := clientv3.Config{

View File

@ -34,7 +34,7 @@ func TestKVProxyRange(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t) defer clus.Terminate(t)
kvts := newKVProxyServer([]string{clus.Members[0].GRPCAddr()}, t) kvts := newKVProxyServer([]string{clus.Members[0].GRPCURL()}, t)
defer kvts.close() defer kvts.close()
// create a client and try to get key from proxy. // create a client and try to get key from proxy.

Some files were not shown because too many files have changed in this diff Show More