Compare commits
62 Commits
client/pkg
...
v3.3.0
Author | SHA1 | Date | |
---|---|---|---|
c23606781f | |||
afa01aaef0 | |||
d20e5a6bb5 | |||
6931dd8442 | |||
f320348682 | |||
d7e6dd77bb | |||
50d2a00f01 | |||
c5bba152ee | |||
dbde4e986b | |||
f9b7fccf1b | |||
9deb838ddb | |||
baf7320e10 | |||
ea6360f550 | |||
2aa3d91759 | |||
7973612c6e | |||
1c91ddc6f4 | |||
8a18cc96d0 | |||
a90f301ba8 | |||
374dc5743f | |||
55505617df | |||
a9317d3d77 | |||
02d362ccde | |||
d292337d14 | |||
7974f008f3 | |||
4a3f99415e | |||
6340564c84 | |||
6735028ec0 | |||
906f098053 | |||
8a66237693 | |||
d37afffb98 | |||
7e2759da8d | |||
ad4df985fc | |||
2df89c8bf6 | |||
6178c45066 | |||
9ccae0f81a | |||
a5079cc381 | |||
9e079d8f02 | |||
bd57c9ca5b | |||
58c402a47b | |||
3ce73b70bc | |||
ee3c81d8d3 | |||
2dfabfbef6 | |||
bf83d5269f | |||
a609b1eb47 | |||
1ae0c0b47d | |||
ec43197344 | |||
70ba0518f1 | |||
e330f5004f | |||
0ec5023b7b | |||
0f69520622 | |||
d3c2acf090 | |||
5e35f79087 | |||
6dff1a9398 | |||
325913d6fb | |||
24c9fb0527 | |||
8511db5e2b | |||
3193f3c9ab | |||
bdc508cadf | |||
d5a0609412 | |||
67af1a2138 | |||
66d68a8fdb | |||
ebaa83c985 |
@ -2,7 +2,7 @@
|
||||
|
||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
||||
|
||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.2.11"
|
||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0-rc.0"
|
||||
if [ "$TEST_ARCH" == "386" ]; then
|
||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
||||
fi
|
||||
@ -10,7 +10,7 @@ fi
|
||||
docker run \
|
||||
--rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd \
|
||||
gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "${TEST_OPTS} ./test 2>&1 | tee test-${TEST_SUFFIX}.log"
|
||||
|
||||
! egrep "(--- FAIL:|panic: test timed out|appears to have leaked)" -B50 -A10 test-${TEST_SUFFIX}.log
|
||||
|
16
.travis.yml
16
.travis.yml
@ -6,7 +6,7 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- 1.9.2
|
||||
- 1.9.3
|
||||
- tip
|
||||
|
||||
notifications:
|
||||
@ -30,7 +30,7 @@ matrix:
|
||||
- go: tip
|
||||
env: TARGET=amd64-go-tip
|
||||
exclude:
|
||||
- go: 1.9.2
|
||||
- go: 1.9.3
|
||||
env: TARGET=amd64-go-tip
|
||||
- go: tip
|
||||
env: TARGET=amd64
|
||||
@ -48,7 +48,7 @@ matrix:
|
||||
env: TARGET=ppc64le
|
||||
|
||||
before_install:
|
||||
- docker pull gcr.io/etcd-development/etcd-test:go1.9.2
|
||||
- docker pull gcr.io/etcd-development/etcd-test:go1.9.3
|
||||
|
||||
install:
|
||||
- pushd cmd/etcd && go get -t -v ./... && popd
|
||||
@ -58,7 +58,7 @@ script:
|
||||
case "${TARGET}" in
|
||||
amd64)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "GOARCH=amd64 ./test"
|
||||
;;
|
||||
amd64-go-tip)
|
||||
@ -66,23 +66,23 @@ script:
|
||||
;;
|
||||
darwin-amd64)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-a -v' GOOS=darwin GOARCH=amd64 ./build"
|
||||
;;
|
||||
windows-amd64)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-a -v' GOOS=windows GOARCH=amd64 ./build"
|
||||
;;
|
||||
386)
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "GOARCH=386 PASSES='build unit' ./test"
|
||||
;;
|
||||
*)
|
||||
# test building out of gopath
|
||||
docker run --rm \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.2 \
|
||||
--volume=`pwd`:/go/src/github.com/coreos/etcd gcr.io/etcd-development/etcd-test:go1.9.3 \
|
||||
/bin/bash -c "GO_BUILD_FLAGS='-a -v' GOARCH='${TARGET}' ./build"
|
||||
;;
|
||||
esac
|
||||
|
1
.words
1
.words
@ -33,6 +33,7 @@ mutex
|
||||
prefetching
|
||||
protobuf
|
||||
prometheus
|
||||
rafthttp
|
||||
repin
|
||||
serializable
|
||||
teardown
|
||||
|
@ -1,8 +1,4 @@
|
||||
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??)
|
||||
|
||||
**v3.3.0 is not yet released; expected to be released in January 2018.**
|
||||
|
||||
## [v3.3.0-rc.0](https://github.com/coreos/etcd/releases/tag/v3.3.0-rc.0) (2017-12-20)
|
||||
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0)
|
||||
|
||||
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.
|
||||
|
||||
|
@ -102,6 +102,12 @@ To recover from the low space quota alarm:
|
||||
2. [Defragment][maintenance-defragment] every etcd endpoint.
|
||||
3. [Disarm][maintenance-disarm] the alarm.
|
||||
|
||||
### What does the etcd warning "etcdserver/api/v3rpc: transport: http2Server.HandleStreams failed to read frame: read tcp 127.0.0.1:2379->127.0.0.1:43020: read: connection reset by peer" mean?
|
||||
|
||||
This is gRPC-side warning when a server receives a TCP RST flag with client-side streams being prematurely closed. For example, a client closes its connection, while gRPC server has not yet processed all HTTP/2 frames in the TCP queue. Some data may have been lost in server side, but it is ok so long as client connection has already been closed.
|
||||
|
||||
Only [old versions of gRPC](https://github.com/grpc/grpc-go/issues/1362) log this. etcd [>=v3.2.13 by default log this with DEBUG level](https://github.com/coreos/etcd/pull/9080), thus only visible with `--debug` flag enabled.
|
||||
|
||||
## Performance
|
||||
|
||||
### How should I benchmark etcd?
|
||||
|
@ -152,7 +152,6 @@
|
||||
- [mattn/etcdenv](https://github.com/mattn/etcdenv) - "env" shebang with etcd integration
|
||||
- [kelseyhightower/confd](https://github.com/kelseyhightower/confd) - Manage local app config files using templates and data from etcd
|
||||
- [configdb](https://git.autistici.org/ai/configdb/tree/master) - A REST relational abstraction on top of arbitrary database backends, aimed at storing configs and inventories.
|
||||
- [fleet](https://github.com/coreos/fleet) - Distributed init system
|
||||
- [kubernetes/kubernetes](https://github.com/kubernetes/kubernetes) - Container cluster manager introduced by Google.
|
||||
- [mailgun/vulcand](https://github.com/mailgun/vulcand) - HTTP proxy that uses etcd as a configuration backend.
|
||||
- [duedil-ltd/discodns](https://github.com/duedil-ltd/discodns) - Simple DNS nameserver using etcd as a database for names and records.
|
||||
|
@ -1,6 +1,11 @@
|
||||
# Configuration flags
|
||||
|
||||
etcd is configurable through command-line flags and environment variables. Options set on the command line take precedence over those from the environment.
|
||||
etcd is configurable through a configuration file, various command-line flags, and environment variables.
|
||||
|
||||
A reusable configuration file is a YAML file made with name and value of one or more command-line flags described below. In order to use this file, specify the file path as a value to the `--config-file` flag. The [sample configuration file][sample-config-file] can be used as a starting point to create a new configuration file as needed.
|
||||
|
||||
Options set on the command line take precedence over those from the environment. If a configuration file is provided, other command line flags and environment variables will be ignored.
|
||||
For example, `etcd --config-file etcd.conf.yml.sample --data-dir /tmp` will ignore the `--data-dir` flag.
|
||||
|
||||
The format of environment variable for flag `--my-flag` is `ETCD_MY_FLAG`. It applies to all flags.
|
||||
|
||||
@ -266,12 +271,12 @@ The security flags help to [build a secure etcd cluster][security].
|
||||
+ env variable: ETCD_PEER_CA_FILE
|
||||
|
||||
### --peer-cert-file
|
||||
+ Path to the peer server TLS cert file.
|
||||
+ Path to the peer server TLS cert file. This is the cert for peer-to-peer traffic, used both for server and client.
|
||||
+ default: ""
|
||||
+ env variable: ETCD_PEER_CERT_FILE
|
||||
|
||||
### --peer-key-file
|
||||
+ Path to the peer server TLS key file.
|
||||
+ Path to the peer server TLS key file. This is the key for peer-to-peer traffic, used both for server and client.
|
||||
+ default: ""
|
||||
+ env variable: ETCD_PEER_KEY_FILE
|
||||
|
||||
@ -332,6 +337,7 @@ Follow the instructions when using these flags.
|
||||
### --config-file
|
||||
+ Load server configuration from a file.
|
||||
+ default: ""
|
||||
+ example: [sample configuration file][sample-config-file]
|
||||
|
||||
## Profiling flags
|
||||
|
||||
@ -369,3 +375,4 @@ Follow the instructions when using these flags.
|
||||
[security]: security.md
|
||||
[systemd-intro]: http://freedesktop.org/wiki/Software/systemd/
|
||||
[tuning]: ../tuning.md#time-parameters
|
||||
[sample-config-file]: ../../etcd.conf.yml.sample
|
||||
|
@ -17,14 +17,14 @@ export NODE1=192.168.1.21
|
||||
Trust the CoreOS [App Signing Key](https://coreos.com/security/app-signing-key/).
|
||||
|
||||
```
|
||||
sudo rkt trust --prefix coreos.com/etcd
|
||||
sudo rkt trust --prefix quay.io/coreos/etcd
|
||||
# gpg key fingerprint is: 18AD 5014 C99E F7E3 BA5F 6CE9 50BD D3E0 FC8A 365E
|
||||
```
|
||||
|
||||
Run the `v3.1.2` version of etcd or specify another release version.
|
||||
Run the `v3.2` version of etcd or specify another release version.
|
||||
|
||||
```
|
||||
sudo rkt run --net=default:IP=${NODE1} coreos.com/etcd:v3.1.2 -- -name=node1 -advertise-client-urls=http://${NODE1}:2379 -initial-advertise-peer-urls=http://${NODE1}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE1}:2380 -initial-cluster=node1=http://${NODE1}:2380
|
||||
sudo rkt run --net=default:IP=${NODE1} quay.io/coreos/etcd:v3.2 -- -name=node1 -advertise-client-urls=http://${NODE1}:2379 -initial-advertise-peer-urls=http://${NODE1}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE1}:2380 -initial-cluster=node1=http://${NODE1}:2380
|
||||
```
|
||||
|
||||
List the cluster member.
|
||||
@ -45,13 +45,13 @@ export NODE3=172.16.28.23
|
||||
|
||||
```
|
||||
# node 1
|
||||
sudo rkt run --net=default:IP=${NODE1} coreos.com/etcd:v3.1.2 -- -name=node1 -advertise-client-urls=http://${NODE1}:2379 -initial-advertise-peer-urls=http://${NODE1}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE1}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
sudo rkt run --net=default:IP=${NODE1} quay.io/coreos/etcd:v3.2 -- -name=node1 -advertise-client-urls=http://${NODE1}:2379 -initial-advertise-peer-urls=http://${NODE1}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE1}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
|
||||
# node 2
|
||||
sudo rkt run --net=default:IP=${NODE2} coreos.com/etcd:v3.1.2 -- -name=node2 -advertise-client-urls=http://${NODE2}:2379 -initial-advertise-peer-urls=http://${NODE2}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE2}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
sudo rkt run --net=default:IP=${NODE2} quay.io/coreos/etcd:v3.2 -- -name=node2 -advertise-client-urls=http://${NODE2}:2379 -initial-advertise-peer-urls=http://${NODE2}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE2}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
|
||||
# node 3
|
||||
sudo rkt run --net=default:IP=${NODE3} coreos.com/etcd:v3.1.2 -- -name=node3 -advertise-client-urls=http://${NODE3}:2379 -initial-advertise-peer-urls=http://${NODE3}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE3}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
sudo rkt run --net=default:IP=${NODE3} quay.io/coreos/etcd:v3.2 -- -name=node3 -advertise-client-urls=http://${NODE3}:2379 -initial-advertise-peer-urls=http://${NODE3}:2380 -listen-client-urls=http://0.0.0.0:2379 -listen-peer-urls=http://${NODE3}:2380 -initial-cluster=node1=http://${NODE1}:2380,node2=http://${NODE2}:2380,node3=http://${NODE3}:2380
|
||||
```
|
||||
|
||||
Verify the cluster is healthy and can be reached.
|
||||
|
@ -43,8 +43,8 @@ ANNOTATIONS {
|
||||
|
||||
# alert if more than 1% of gRPC method calls have failed within the last 5 minutes
|
||||
ALERT HighNumberOfFailedGRPCRequests
|
||||
IF sum by(grpc_method) (rate(etcd_grpc_requests_failed_total{job="etcd"}[5m]))
|
||||
/ sum by(grpc_method) (rate(etcd_grpc_total{job="etcd"}[5m])) > 0.01
|
||||
IF 100 * (sum by(grpc_method) (rate(etcd_grpc_requests_failed_total{job="etcd"}[5m]))
|
||||
/ sum by(grpc_method) (rate(etcd_grpc_total{job="etcd"}[5m]))) > 1
|
||||
FOR 10m
|
||||
LABELS {
|
||||
severity = "warning"
|
||||
@ -56,8 +56,8 @@ ANNOTATIONS {
|
||||
|
||||
# alert if more than 5% of gRPC method calls have failed within the last 5 minutes
|
||||
ALERT HighNumberOfFailedGRPCRequests
|
||||
IF sum by(grpc_method) (rate(etcd_grpc_requests_failed_total{job="etcd"}[5m]))
|
||||
/ sum by(grpc_method) (rate(etcd_grpc_total{job="etcd"}[5m])) > 0.05
|
||||
IF 100 * (sum by(grpc_method) (rate(etcd_grpc_requests_failed_total{job="etcd"}[5m]))
|
||||
/ sum by(grpc_method) (rate(etcd_grpc_total{job="etcd"}[5m]))) > 5
|
||||
FOR 5m
|
||||
LABELS {
|
||||
severity = "critical"
|
||||
@ -84,8 +84,8 @@ ANNOTATIONS {
|
||||
|
||||
# alert if more than 1% of requests to an HTTP endpoint have failed within the last 5 minutes
|
||||
ALERT HighNumberOfFailedHTTPRequests
|
||||
IF sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method) > 0.01
|
||||
IF 100 * (sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method)) > 1
|
||||
FOR 10m
|
||||
LABELS {
|
||||
severity = "warning"
|
||||
@ -97,8 +97,8 @@ ANNOTATIONS {
|
||||
|
||||
# alert if more than 5% of requests to an HTTP endpoint have failed within the last 5 minutes
|
||||
ALERT HighNumberOfFailedHTTPRequests
|
||||
IF sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method) > 0.05
|
||||
IF 100 * (sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method)) > 5
|
||||
FOR 5m
|
||||
LABELS {
|
||||
severity = "critical"
|
||||
|
@ -26,8 +26,8 @@ groups:
|
||||
changes within the last hour
|
||||
summary: a high number of leader changes within the etcd cluster are happening
|
||||
- alert: HighNumberOfFailedGRPCRequests
|
||||
expr: sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method) > 0.01
|
||||
expr: 100 * (sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method)) > 1
|
||||
for: 10m
|
||||
labels:
|
||||
severity: warning
|
||||
@ -36,8 +36,8 @@ groups:
|
||||
on etcd instance {{ $labels.instance }}'
|
||||
summary: a high number of gRPC requests are failing
|
||||
- alert: HighNumberOfFailedGRPCRequests
|
||||
expr: sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method) > 0.05
|
||||
expr: 100 * (sum(rate(grpc_server_handled_total{grpc_code!="OK",job="etcd"}[5m])) BY (grpc_service, grpc_method)
|
||||
/ sum(rate(grpc_server_handled_total{job="etcd"}[5m])) BY (grpc_service, grpc_method)) > 5
|
||||
for: 5m
|
||||
labels:
|
||||
severity: critical
|
||||
@ -56,8 +56,8 @@ groups:
|
||||
}} are slow
|
||||
summary: slow gRPC requests
|
||||
- alert: HighNumberOfFailedHTTPRequests
|
||||
expr: sum(rate(etcd_http_failed_total{job="etcd"}[5m])) BY (method) / sum(rate(etcd_http_received_total{job="etcd"}[5m]))
|
||||
BY (method) > 0.01
|
||||
expr: 100 * (sum(rate(etcd_http_failed_total{job="etcd"}[5m])) BY (method) / sum(rate(etcd_http_received_total{job="etcd"}[5m]))
|
||||
BY (method)) > 1
|
||||
for: 10m
|
||||
labels:
|
||||
severity: warning
|
||||
@ -66,8 +66,8 @@ groups:
|
||||
instance {{ $labels.instance }}'
|
||||
summary: a high number of HTTP requests are failing
|
||||
- alert: HighNumberOfFailedHTTPRequests
|
||||
expr: sum(rate(etcd_http_failed_total{job="etcd"}[5m])) BY (method) / sum(rate(etcd_http_received_total{job="etcd"}[5m]))
|
||||
BY (method) > 0.05
|
||||
expr: 100 * (sum(rate(etcd_http_failed_total{job="etcd"}[5m])) BY (method) / sum(rate(etcd_http_received_total{job="etcd"}[5m]))
|
||||
BY (method)) > 5
|
||||
for: 5m
|
||||
labels:
|
||||
severity: critical
|
||||
|
@ -48,7 +48,7 @@ Example application workload: A 50-node Kubernetes cluster
|
||||
| Provider | Type | vCPUs | Memory (GB) | Max concurrent IOPS | Disk bandwidth (MB/s) |
|
||||
|----------|------|-------|--------|------|----------------|
|
||||
| AWS | m4.large | 2 | 8 | 3600 | 56.25 |
|
||||
| GCE | n1-standard-1 + 50GB PD SSD | 2 | 7.5 | 1500 | 25 |
|
||||
| GCE | n1-standard-2 + 50GB PD SSD | 2 | 7.5 | 1500 | 25 |
|
||||
|
||||
|
||||
### Medium cluster
|
||||
|
@ -36,9 +36,9 @@ Error: rpc error: code = 11 desc = etcdserver: mvcc: required revision has been
|
||||
|
||||
## Defragmentation
|
||||
|
||||
After compacting the keyspace, the backend database may exhibit internal fragmentation. Any internal fragmentation is space that is free to use by the backend but still consumes storage space. The process of defragmentation releases this storage space back to the file system. Defragmentation is issued on a per-member so that cluster-wide latency spikes may be avoided.
|
||||
After compacting the keyspace, the backend database may exhibit internal fragmentation. Any internal fragmentation is space that is free to use by the backend but still consumes storage space. Compacting old revisions internally fragments `etcd` by leaving gaps in backend database. Fragmented space is available for use by `etcd` but unavailable to the host filesystem. In other words, deleting application data does not reclaim the space on disk.
|
||||
|
||||
Compacting old revisions internally fragments `etcd` by leaving gaps in backend database. Fragmented space is available for use by `etcd` but unavailable to the host filesystem.
|
||||
The process of defragmentation releases this storage space back to the file system. Defragmentation is issued on a per-member so that cluster-wide latency spikes may be avoided.
|
||||
|
||||
To defragment an etcd member, use the `etcdctl defrag` command:
|
||||
|
||||
@ -47,6 +47,8 @@ $ etcdctl defrag
|
||||
Finished defragmenting etcd member[127.0.0.1:2379]
|
||||
```
|
||||
|
||||
Note that defragmentation to a live member blocks the system from reading and writing data while rebuilding its states.
|
||||
|
||||
To defragment an etcd data directory directly, while etcd is not running, use the command:
|
||||
|
||||
``` sh
|
||||
@ -80,14 +82,14 @@ $ ETCDCTL_API=3 etcdctl --write-out=table endpoint status
|
||||
+----------------+------------------+-----------+---------+-----------+-----------+------------+
|
||||
# confirm alarm is raised
|
||||
$ ETCDCTL_API=3 etcdctl alarm list
|
||||
memberID:13803658152347727308 alarm:NOSPACE
|
||||
memberID:13803658152347727308 alarm:NOSPACE
|
||||
```
|
||||
|
||||
Removing excessive keyspace data and defragmenting the backend database will put the cluster back within the quota limits:
|
||||
|
||||
```sh
|
||||
# get current revision
|
||||
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9]*')
|
||||
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9].*')
|
||||
# compact away all old revisions
|
||||
$ ETCDCTL_API=3 etcdctl compact $rev
|
||||
compacted revision 1516
|
||||
@ -96,7 +98,7 @@ $ ETCDCTL_API=3 etcdctl defrag
|
||||
Finished defragmenting etcd member[127.0.0.1:2379]
|
||||
# disarm alarm
|
||||
$ ETCDCTL_API=3 etcdctl alarm disarm
|
||||
memberID:13803658152347727308 alarm:NOSPACE
|
||||
memberID:13803658152347727308 alarm:NOSPACE
|
||||
# test puts are allowed again
|
||||
$ ETCDCTL_API=3 etcdctl put newkey 123
|
||||
OK
|
||||
|
@ -195,9 +195,9 @@ When client authentication is enabled for an etcd member, the administrator must
|
||||
|
||||
## Notes for TLS authentication
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [TLS certificates get reloaded on every client connection](https://github.com/coreos/etcd/pull/7829). This is useful when replacing expiry certs without stopping etcd servers; it can be done by overwriting old certs with new ones. Refreshing certs for every connection should not have too much overhead, but can be improved in the future, with caching layer. Example tests can be found [here](https://github.com/coreos/etcd/blob/b041ce5d514a4b4aaeefbffb008f0c7570a18986/integration/v3_grpc_test.go#L1601-L1757).
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.2.md#v320-2017-06-09), [TLS certificates get reloaded on every client connection](https://github.com/coreos/etcd/pull/7829). This is useful when replacing expiry certs without stopping etcd servers; it can be done by overwriting old certs with new ones. Refreshing certs for every connection should not have too much overhead, but can be improved in the future, with caching layer. Example tests can be found [here](https://github.com/coreos/etcd/blob/b041ce5d514a4b4aaeefbffb008f0c7570a18986/integration/v3_grpc_test.go#L1601-L1757).
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.2.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains any IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
@ -223,50 +223,104 @@ Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017
|
||||
|
||||
when peer B's actual IP address is `10.138.0.2`, not `10.138.0.27`. When peer B tries to join the cluster, peer A will reject B with the error `x509: certificate is valid for 10.138.0.27, not 10.138.0.2`, because B's remote IP address does not match the one in Subject Alternative Name (SAN) field.
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains any DNS names in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.2.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups (`dig b.com`) on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"b.com"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A looks up the incoming host `b.com` to get the list of IP addresses (e.g. `dig b.com`). And rejects B if the list does not contain the IP `10.138.0.2`, with the error `tls: 10.138.0.2 does not match any of DNSNames ["b.com"]`.
|
||||
|
||||
Since [v3.2.2](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v322-2017-07-07), [server accepts connections if IP matches, without checking DNS entries](https://github.com/coreos/etcd/pull/8223). For instance, if peer cert contains IP addresses and DNS names in Subject Alternative Name (SAN) field, and the remote IP address matches one of those IP addresses, server just accepts connection without further checking the DNS names. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.2](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.2.md#v322-2017-07-07), [server accepts connections if IP matches, without checking DNS entries](https://github.com/coreos/etcd/pull/8223). For instance, if peer cert contains IP addresses and DNS names in Subject Alternative Name (SAN) field, and the remote IP address matches one of those IP addresses, server just accepts connection without further checking the DNS names. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"invalid.domain",
|
||||
"10.138.0.2"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2` and `invalid.domain` is a invalid host. When peer B tries to join the cluster, peer A successfully authenticates B, since Subject Alternative Name (SAN) field has a valid matching IP address. See [issue#8206](https://github.com/coreos/etcd/issues/8206) for more detail.
|
||||
|
||||
Since [v3.2.5](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v325-2017-08-04), [server supports reverse-lookup on wildcard DNS `SAN`](https://github.com/coreos/etcd/pull/8281). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server first reverse-lookups the remote IP address to get a list of names mapping to that address (e.g. `nslookup IPADDR`). Then accepts the connection if those names have a matching name with peer cert's DNS names (either by exact or wildcard match). If none is matched, server forward-lookups each DNS entry in peer cert (e.g. look up `example.default.svc` when the entry is `*.example.default.svc`), and accepts connection only when the host's resolved addresses have the matching IP address with the peer's remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.5](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.2.md#v325-2017-08-04), [server supports reverse-lookup on wildcard DNS `SAN`](https://github.com/coreos/etcd/pull/8281). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server first reverse-lookups the remote IP address to get a list of names mapping to that address (e.g. `nslookup IPADDR`). Then accepts the connection if those names have a matching name with peer cert's DNS names (either by exact or wildcard match). If none is matched, server forward-lookups each DNS entry in peer cert (e.g. look up `example.default.svc` when the entry is `*.example.default.svc`), and accepts connection only when the host's resolved addresses have the matching IP address with the peer's remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"*.example.default.svc",
|
||||
"*.example.default.svc.cluster.local"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A reverse-lookup the IP `10.138.0.2` to get the list of host names. And either exact or wildcard match the host names with peer B's cert DNS names in Subject Alternative Name (SAN) field. If none of reverse/forward lookups worked, it returns an error `"tls: "10.138.0.2" does not match any of DNSNames ["*.example.default.svc","*.example.default.svc.cluster.local"]`. See [issue#8268](https://github.com/coreos/etcd/issues/8268) for more detail.
|
||||
|
||||
[v3.3.0](https://github.com/coreos/etcd/blob/master/CHANGELOG-3.3.md) adds [`etcd --peer-cert-allowed-cn`](https://github.com/coreos/etcd/pull/8616) flag to support [CN(Common Name)-based auth for inter-peer connections](https://github.com/coreos/etcd/issues/8262). Kubernetes TLS bootstrapping involves generating dynamic certificates for etcd members and other system components (e.g. API server, kubelet, etc.). Maintaining different CAs for each component provides tighter access control to etcd cluster but often tedious. When `--peer-cert-allowed-cn` flag is specified, node can only join with matching common name even with shared CAs. For example, each member in 3-node cluster is set up with CSRs (with `cfssl`) as below:
|
||||
|
||||
```json
|
||||
{
|
||||
"CN": "etcd.local",
|
||||
"hosts": [
|
||||
"m1.etcd.local",
|
||||
"127.0.0.1",
|
||||
"localhost"
|
||||
],
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"CN": "etcd.local",
|
||||
"hosts": [
|
||||
"m2.etcd.local",
|
||||
"127.0.0.1",
|
||||
"localhost"
|
||||
],
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"CN": "etcd.local",
|
||||
"hosts": [
|
||||
"m3.etcd.local",
|
||||
"127.0.0.1",
|
||||
"localhost"
|
||||
],
|
||||
```
|
||||
|
||||
Then only peers with matching common names will be authenticated if `--peer-cert-allowed-cn etcd.local` is given. And nodes with different CNs in CSRs or different `--peer-cert-allowed-cn` will be rejected:
|
||||
|
||||
```bash
|
||||
$ etcd --peer-cert-allowed-cn m1.etcd.local
|
||||
|
||||
I | embed: rejected connection from "127.0.0.1:48044" (error "CommonName authentication failed", ServerName "m1.etcd.local")
|
||||
I | embed: rejected connection from "127.0.0.1:55702" (error "remote error: tls: bad certificate", ServerName "m3.etcd.local")
|
||||
```
|
||||
|
||||
Each process should be started with:
|
||||
|
||||
```bash
|
||||
etcd --peer-cert-allowed-cn etcd.local
|
||||
|
||||
I | pkg/netutil: resolving m3.etcd.local:32380 to 127.0.0.1:32380
|
||||
I | pkg/netutil: resolving m2.etcd.local:22380 to 127.0.0.1:22380
|
||||
I | pkg/netutil: resolving m1.etcd.local:2380 to 127.0.0.1:2380
|
||||
I | etcdserver: published {Name:m3 ClientURLs:[https://m3.etcd.local:32379]} to cluster 9db03f09b20de32b
|
||||
I | embed: ready to serve client requests
|
||||
I | etcdserver: published {Name:m1 ClientURLs:[https://m1.etcd.local:2379]} to cluster 9db03f09b20de32b
|
||||
I | embed: ready to serve client requests
|
||||
I | etcdserver: published {Name:m2 ClientURLs:[https://m2.etcd.local:22379]} to cluster 9db03f09b20de32b
|
||||
I | embed: ready to serve client requests
|
||||
I | embed: serving client requests on 127.0.0.1:32379
|
||||
I | embed: serving client requests on 127.0.0.1:22379
|
||||
I | embed: serving client requests on 127.0.0.1:2379
|
||||
```
|
||||
|
||||
## Frequently asked questions
|
||||
|
||||
### I'm seeing a SSLv3 alert handshake failure when using TLS client authentication?
|
||||
|
@ -66,7 +66,7 @@ if err == context.DeadlineExceeded {
|
||||
|
||||
#### Change in maximum request size limits (>=3.2.10)
|
||||
|
||||
3.2.10 and 3.2.11 allow custom request size limits in server side. >=3.2.12 allows custom request size limits for both server and **client side**.
|
||||
3.2.10 and 3.2.11 allow custom request size limits in server side. >=3.2.12 allows custom request size limits for both server and **client side**. In previous versions(v3.2.10, v3.2.11), client response size was limited to only 4 MiB.
|
||||
|
||||
Server-side request limits can be configured with `--max-request-bytes` flag:
|
||||
|
||||
@ -160,12 +160,6 @@ Before and after
|
||||
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
||||
```
|
||||
|
||||
#### Change in `--listen-peer-urls` and `--listen-client-urls`
|
||||
|
||||
3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`.
|
||||
|
||||
See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts.
|
||||
|
||||
#### Change in `clientv3.Lease.TimeToLive` API
|
||||
|
||||
Previously, `clientv3.Lease.TimeToLive` API returned `lease.ErrLeaseNotFound` on non-existent lease ID. 3.2 instead returns TTL=-1 in its response and no error (see [#7305](https://github.com/coreos/etcd/pull/7305)).
|
||||
@ -206,6 +200,12 @@ import clientv3yaml "github.com/coreos/etcd/clientv3/yaml"
|
||||
clientv3yaml.NewConfig
|
||||
```
|
||||
|
||||
#### Change in `--listen-peer-urls` and `--listen-client-urls`
|
||||
|
||||
3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`.
|
||||
|
||||
See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts.
|
||||
|
||||
### Server upgrade checklists
|
||||
|
||||
#### Upgrade requirements
|
||||
|
@ -72,25 +72,15 @@ cfg.SetupLogging()
|
||||
|
||||
Set `embed.Config.Debug` field to `true` to enable gRPC server logs.
|
||||
|
||||
#### Change in `/health` endpoint response value
|
||||
#### Change in `/health` endpoint response
|
||||
|
||||
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 instead defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct and returns properly encoded JSON value with errors, if any.
|
||||
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 now defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct.
|
||||
|
||||
Before
|
||||
Note that in v3.3.0-rc.0, v3.3.0-rc.1, and v3.3.0-rc.2, `etcdhttp.Health` has boolean type `"health"` and `"errors"` fields. For backward compatibilities, we reverted `"health"` field to `string` type and removed `"errors"` field. Further health information will be provided in separate APIs.
|
||||
|
||||
```bash
|
||||
$ curl http://localhost:2379/health
|
||||
{"health": "true"}
|
||||
```
|
||||
|
||||
After
|
||||
|
||||
```bash
|
||||
$ curl http://localhost:2379/health
|
||||
{"health":true}
|
||||
|
||||
# Or
|
||||
{"health":false,"errors":["NOSPACE"]}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
||||
#### Change in gRPC gateway HTTP endpoints (replaced `/v3alpha` with `/v3beta`)
|
||||
@ -113,7 +103,7 @@ Requests to `/v3alpha` endpoints will redirect to `/v3beta`, and `/v3alpha` will
|
||||
|
||||
#### Change in maximum request size limits
|
||||
|
||||
3.3 now allows custom request size limits for both server and **client side**.
|
||||
3.3 now allows custom request size limits for both server and **client side**. In previous versions(v3.2.10, v3.2.11), client response size was limited to only 4 MiB.
|
||||
|
||||
Server-side request limits can be configured with `--max-request-bytes` flag:
|
||||
|
||||
|
@ -45,12 +45,12 @@ It is important to monitor your production etcd cluster for healthy information
|
||||
|
||||
#### Health Monitoring
|
||||
|
||||
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":true}`, then the cluster is healthy.
|
||||
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":"true"}`, then the cluster is healthy.
|
||||
|
||||
```
|
||||
$ curl -L http://127.0.0.1:2379/health
|
||||
|
||||
{"health":true}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
||||
You can also use etcdctl to check the cluster-wide health information. It will contact all the members of the cluster and collect the health information for you.
|
||||
|
@ -29,5 +29,5 @@ curl http://10.0.0.10:2379/health
|
||||
```
|
||||
|
||||
```json
|
||||
{"health":true}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@ -66,6 +67,11 @@ func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
|
||||
if err != nil {
|
||||
@ -84,6 +90,11 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
||||
}
|
||||
|
||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// it is safe to retry on update.
|
||||
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
|
||||
|
@ -54,7 +54,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
|
||||
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
|
||||
// dial for unhealthy endpoint.
|
||||
// then we can reduce 3s to 1s.
|
||||
timeout := pingInterval + 3*time.Second
|
||||
timeout := pingInterval + integration.RequestWaitTimeout
|
||||
|
||||
cli, err := clientv3.New(ccfg)
|
||||
if err != nil {
|
||||
|
@ -126,3 +126,36 @@ func TestMemberUpdate(t *testing.T) {
|
||||
t.Errorf("urls = %v, want %v", urls, resp.Members[0].PeerURLs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemberAddUpdateWrongURLs(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
capi := clus.RandClient()
|
||||
tt := [][]string{
|
||||
// missing protocol scheme
|
||||
{"://127.0.0.1:2379"},
|
||||
// unsupported scheme
|
||||
{"mailto://127.0.0.1:2379"},
|
||||
// not conform to host:port
|
||||
{"http://127.0.0.1"},
|
||||
// contain a path
|
||||
{"http://127.0.0.1:2379/path"},
|
||||
// first path segment in URL cannot contain colon
|
||||
{"127.0.0.1:1234"},
|
||||
// URL scheme must be http, https, unix, or unixs
|
||||
{"localhost:1234"},
|
||||
}
|
||||
for i := range tt {
|
||||
_, err := capi.MemberAdd(context.Background(), tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberAdd err = nil, but error", i)
|
||||
}
|
||||
_, err = capi.MemberUpdate(context.Background(), 0, tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberUpdate err = nil, but error", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
|
||||
if !setBefore {
|
||||
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
|
||||
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -453,7 +453,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
|
||||
clus.TakeClient(0)
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("kv.Get took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -480,7 +480,7 @@ func TestKVNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("kv.Get took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -906,7 +906,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||
maxCallRecvBytesClient: 0,
|
||||
valueSize: 10 * 1024 * 1024,
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||
},
|
||||
{
|
||||
maxRequestBytesServer: 10 * 1024 * 1024,
|
||||
@ -920,7 +920,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||
maxCallRecvBytesClient: 0,
|
||||
valueSize: 10*1024*1024 + 5,
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
@ -939,7 +939,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
if err != test.expectError {
|
||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
} else if err != nil && err.Error() != test.expectError.Error() {
|
||||
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
|
||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
|
||||
|
@ -299,7 +299,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Grant took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -325,7 +325,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Grant took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -357,7 +357,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Revoke took too long")
|
||||
case <-donec:
|
||||
}
|
||||
|
@ -234,7 +234,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
||||
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
|
||||
select {
|
||||
case <-wch:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("took too long to create watch")
|
||||
}
|
||||
|
||||
@ -252,7 +252,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
||||
if err = ev.Err(); err != rpctypes.ErrNoLeader {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
|
||||
}
|
||||
case <-time.After(3 * time.Second): // enough time to detect leader lost
|
||||
case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
|
||||
t.Fatal("took too long to detect leader lost")
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
|
||||
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
|
||||
select {
|
||||
case <-wch:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("took too long to create watch")
|
||||
}
|
||||
|
||||
@ -348,7 +348,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
|
||||
clus.Members[target].Restart(t)
|
||||
|
||||
select {
|
||||
case <-time.After(clientTimeout + 3*time.Second):
|
||||
case <-time.After(clientTimeout + integration.RequestWaitTimeout):
|
||||
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
|
||||
case <-donec:
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) {
|
||||
clus.TakeClient(0)
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("wc.Watch took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("wc.Watch took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -751,7 +751,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||
if resp.Err() != rpctypes.ErrNoLeader {
|
||||
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("watch without leader took too long to close")
|
||||
}
|
||||
|
||||
@ -760,7 +760,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||
if ok {
|
||||
t.Fatalf("expected closed channel, got response %v", resp)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("waited too long for channel to close")
|
||||
}
|
||||
|
||||
|
@ -445,8 +445,11 @@ func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
||||
lkv.leases.mu.RLock()
|
||||
sessionc := lkv.sessionc
|
||||
lkv.leases.mu.RUnlock()
|
||||
select {
|
||||
case <-lkv.sessionc:
|
||||
case <-sessionc:
|
||||
return nil
|
||||
case <-lkv.ctx.Done():
|
||||
return lkv.ctx.Err()
|
||||
|
@ -53,7 +53,7 @@ func alarmTest(cx ctlCtx) {
|
||||
}
|
||||
|
||||
// '/health' handler should return 'false'
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":false,"errors":["NOSPACE"]}`}); err != nil {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"false"}`}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
@ -45,55 +46,94 @@ type kvExec struct {
|
||||
|
||||
func watchTest(cx ctlCtx) {
|
||||
tests := []struct {
|
||||
puts []kv
|
||||
args []string
|
||||
puts []kv
|
||||
envKey string
|
||||
envRange string
|
||||
args []string
|
||||
|
||||
wkv []kvExec
|
||||
}{
|
||||
{ // watch 1 key
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1"},
|
||||
[]kvExec{{key: "sample", val: "value"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1"},
|
||||
wkv: []kvExec{{key: "sample", val: "value"}},
|
||||
},
|
||||
{ // watch 1 key with env
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
args: []string{"--rev", "1"},
|
||||
wkv: []kvExec{{key: "sample", val: "value"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with ${ETCD_WATCH_VALUE}
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "--", "env"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: `ETCD_WATCH_VALUE="value"`}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received", with env
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
args: []string{"--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"--rev", "1", "sample", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"--rev", "1", "sample", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo \"Hello World!\""
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
envRange: "samplx",
|
||||
args: []string{"--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 3 keys by prefix
|
||||
[]kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
[]string{"key", "--rev", "1", "--prefix"},
|
||||
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
args: []string{"key", "--rev", "1", "--prefix"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
},
|
||||
{ // watch 3 keys by prefix, with env
|
||||
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
envKey: "key",
|
||||
args: []string{"--rev", "1", "--prefix"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
},
|
||||
{ // watch by revision
|
||||
[]kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
|
||||
[]string{"etcd", "--rev", "2"},
|
||||
[]kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
|
||||
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
|
||||
args: []string{"etcd", "--rev", "2"},
|
||||
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
|
||||
},
|
||||
{ // watch 3 keys by range
|
||||
[]kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
[]string{"key", "key3", "--rev", "1"},
|
||||
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
args: []string{"key", "key3", "--rev", "1"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
},
|
||||
{ // watch 3 keys by range, with env
|
||||
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
envKey: "key",
|
||||
envRange: "key3",
|
||||
args: []string{"--rev", "1"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
},
|
||||
}
|
||||
|
||||
@ -107,11 +147,30 @@ func watchTest(cx ctlCtx) {
|
||||
}
|
||||
close(donec)
|
||||
}(i, tt.puts)
|
||||
|
||||
unsetEnv := func() {}
|
||||
if tt.envKey != "" || tt.envRange != "" {
|
||||
if tt.envKey != "" {
|
||||
os.Setenv("ETCDCTL_WATCH_KEY", tt.envKey)
|
||||
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_KEY") }
|
||||
}
|
||||
if tt.envRange != "" {
|
||||
os.Setenv("ETCDCTL_WATCH_RANGE_END", tt.envRange)
|
||||
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_RANGE_END") }
|
||||
}
|
||||
if tt.envKey != "" && tt.envRange != "" {
|
||||
unsetEnv = func() {
|
||||
os.Unsetenv("ETCDCTL_WATCH_KEY")
|
||||
os.Unsetenv("ETCDCTL_WATCH_RANGE_END")
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := ctlV3Watch(cx, tt.args, tt.wkv...); err != nil {
|
||||
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
|
||||
cx.t.Errorf("watchTest #%d: ctlV3Watch error (%v)", i, err)
|
||||
}
|
||||
}
|
||||
unsetEnv()
|
||||
<-donec
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ func metricsTest(cx ctlCtx) {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":true}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"true"}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
|
@ -268,8 +268,11 @@ func (cfg *Config) SetupLogging() {
|
||||
if cfg.Debug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
if cfg.LogPkgLevels != "" {
|
||||
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -179,6 +180,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
AuthToken: cfg.AuthToken,
|
||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||
Debug: cfg.Debug,
|
||||
}
|
||||
|
||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||
@ -522,6 +524,10 @@ func (e *Etcd) serveClients() (err error) {
|
||||
}
|
||||
|
||||
func (e *Etcd) serveMetrics() (err error) {
|
||||
if e.cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
|
||||
if len(e.cfg.ListenMetricsUrls) > 0 {
|
||||
metricsMux := http.NewServeMux()
|
||||
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
|
||||
|
@ -378,6 +378,13 @@ watch [options] <key or prefix>\n
|
||||
# bar
|
||||
```
|
||||
|
||||
```bash
|
||||
ETCDCTL_WATCH_KEY=foo ./etcdctl watch
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
```
|
||||
|
||||
Receive events and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
@ -388,6 +395,41 @@ Receive events and execute `echo watch event received`:
|
||||
# watch event received
|
||||
```
|
||||
|
||||
Watch response is set via `ETCD_WATCH_*` environmental variables:
|
||||
|
||||
```bash
|
||||
./etcdctl watch foo -- sh -c "env | grep ETCD_WATCH_"
|
||||
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# ETCD_WATCH_REVISION=11
|
||||
# ETCD_WATCH_KEY="foo"
|
||||
# ETCD_WATCH_EVENT_TYPE="PUT"
|
||||
# ETCD_WATCH_VALUE="bar"
|
||||
```
|
||||
|
||||
Watch with environmental variables and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
./etcdctl watch -- echo watch event received
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
export ETCDCTL_WATCH_RANGE_END=foox
|
||||
./etcdctl watch -- echo watch event received
|
||||
# PUT
|
||||
# fob
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
##### Interactive
|
||||
|
||||
```bash
|
||||
@ -413,6 +455,29 @@ watch foo -- echo watch event received
|
||||
# watch event received
|
||||
```
|
||||
|
||||
Watch with environmental variables and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
./etcdctl watch -i
|
||||
watch -- echo watch event received
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
export ETCDCTL_WATCH_RANGE_END=foox
|
||||
./etcdctl watch -i
|
||||
watch -- echo watch event received
|
||||
# PUT
|
||||
# fob
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
### LEASE \<subcommand\>
|
||||
|
||||
LEASE provides commands for key lease management.
|
||||
@ -874,6 +939,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman
|
||||
|
||||
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
|
||||
|
||||
- wal-dir -- Path to the WAL directory. Uses data directory if none given.
|
||||
|
||||
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
|
||||
|
||||
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.
|
||||
|
@ -202,7 +202,26 @@ func endpointsFromCluster(cmd *cobra.Command) []string {
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
c := mustClientFromCmd(cmd)
|
||||
|
||||
sec := secureCfgFromCmd(cmd)
|
||||
dt := dialTimeoutFromCmd(cmd)
|
||||
ka := keepAliveTimeFromCmd(cmd)
|
||||
kat := keepAliveTimeoutFromCmd(cmd)
|
||||
eps, err := endpointsFromCmd(cmd)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
// exclude auth for not asking needless password (MemberList() doesn't need authentication)
|
||||
|
||||
cfg, err := newClientCfg(eps, dt, ka, kat, sec, nil)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
c, err := v3.New(*cfg)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
defer func() {
|
||||
c.Close()
|
||||
|
@ -101,8 +101,19 @@ type clientConfig struct {
|
||||
acfg *authCfg
|
||||
}
|
||||
|
||||
type discardValue struct{}
|
||||
|
||||
func (*discardValue) String() string { return "" }
|
||||
func (*discardValue) Set(string) error { return nil }
|
||||
func (*discardValue) Type() string { return "" }
|
||||
|
||||
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
|
||||
fs := cmd.InheritedFlags()
|
||||
|
||||
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_KEY=foo" warnings
|
||||
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_RANGE_END=bar" warnings
|
||||
fs.AddFlag(&pflag.Flag{Name: "watch-key", Value: &discardValue{}})
|
||||
fs.AddFlag(&pflag.Flag{Name: "watch-range-end", Value: &discardValue{}})
|
||||
flags.SetPflagsFromEnv("ETCDCTL", fs)
|
||||
|
||||
debug, err := cmd.Flags().GetBool("debug")
|
||||
|
@ -56,6 +56,7 @@ var (
|
||||
restoreCluster string
|
||||
restoreClusterToken string
|
||||
restoreDataDir string
|
||||
restoreWalDir string
|
||||
restorePeerURLs string
|
||||
restoreName string
|
||||
skipHashCheck bool
|
||||
@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
|
||||
Run: snapshotRestoreCommandFunc,
|
||||
}
|
||||
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
|
||||
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
|
||||
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
|
||||
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
|
||||
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
|
||||
@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
||||
basedir = restoreName + ".etcd"
|
||||
}
|
||||
|
||||
waldir := filepath.Join(basedir, "member", "wal")
|
||||
waldir := restoreWalDir
|
||||
if waldir == "" {
|
||||
waldir = filepath.Join(basedir, "member", "wal")
|
||||
}
|
||||
snapdir := filepath.Join(basedir, "member", "snap")
|
||||
|
||||
if _, err := os.Stat(basedir); err == nil {
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
var (
|
||||
errBadArgsNum = errors.New("bad number of arguments")
|
||||
errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
|
||||
errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
|
||||
errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
|
||||
)
|
||||
@ -59,12 +60,17 @@ func NewWatchCommand() *cobra.Command {
|
||||
|
||||
// watchCommandFunc executes the "watch" command.
|
||||
func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
|
||||
if envKey == "" && envRange != "" {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
|
||||
}
|
||||
|
||||
if watchInteractive {
|
||||
watchInteractiveFunc(cmd, os.Args)
|
||||
watchInteractiveFunc(cmd, os.Args, envKey, envRange)
|
||||
return
|
||||
}
|
||||
|
||||
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, false)
|
||||
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
|
||||
if err != nil {
|
||||
ExitWithError(ExitBadArgs, err)
|
||||
}
|
||||
@ -82,7 +88,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
}
|
||||
|
||||
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
|
||||
c := mustClientFromCmd(cmd)
|
||||
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
@ -95,7 +101,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
l = strings.TrimSuffix(l, "\n")
|
||||
|
||||
args := argify(l)
|
||||
if len(args) < 2 {
|
||||
if len(args) < 2 && envKey == "" {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
|
||||
continue
|
||||
}
|
||||
@ -105,7 +111,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
continue
|
||||
}
|
||||
|
||||
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, true)
|
||||
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
|
||||
if perr != nil {
|
||||
ExitWithError(ExitBadArgs, perr)
|
||||
}
|
||||
@ -149,11 +155,18 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
|
||||
display.Watch(resp)
|
||||
|
||||
if len(execArgs) > 0 {
|
||||
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
|
||||
for _, ev := range resp.Events {
|
||||
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
|
||||
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -165,7 +178,7 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
|
||||
// (e.g. ./bin/etcdctl watch foo --rev 1 bar).
|
||||
// "--" characters are invalid arguments for "spf13/cobra" library,
|
||||
// so no need to handle such cases.
|
||||
func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs []string, execArgs []string, err error) {
|
||||
func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
|
||||
watchArgs = commandArgs
|
||||
|
||||
// remove preceding commands (e.g. "watch foo bar" in interactive mode)
|
||||
@ -175,12 +188,54 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
break
|
||||
}
|
||||
}
|
||||
if idx < len(watchArgs)-1 {
|
||||
watchArgs = watchArgs[idx+1:]
|
||||
if idx < len(watchArgs)-1 || envKey != "" {
|
||||
if idx < len(watchArgs)-1 {
|
||||
watchArgs = watchArgs[idx+1:]
|
||||
}
|
||||
|
||||
execIdx, execExist := 0, false
|
||||
for execIdx = range osArgs {
|
||||
v := osArgs[execIdx]
|
||||
if v == "--" && execIdx != len(osArgs)-1 {
|
||||
execExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if idx == len(watchArgs)-1 && envKey != "" {
|
||||
if len(watchArgs) > 0 && !interactive {
|
||||
// "watch --rev 1 -- echo Hello World" has no conflict
|
||||
if !execExist {
|
||||
// "watch foo" with ETCDCTL_WATCH_KEY=foo
|
||||
// (watchArgs==["foo"])
|
||||
return nil, nil, errBadArgsNumConflictEnv
|
||||
}
|
||||
}
|
||||
// otherwise, watch with no argument and environment key is set
|
||||
// if interactive, first "watch" command string should be removed
|
||||
if interactive {
|
||||
watchArgs = []string{}
|
||||
}
|
||||
}
|
||||
|
||||
// "watch foo -- echo hello" with ETCDCTL_WATCH_KEY=foo
|
||||
// (watchArgs==["foo","echo","hello"])
|
||||
if envKey != "" && execExist {
|
||||
widx, oidx := 0, len(osArgs)-1
|
||||
for widx = len(watchArgs) - 1; widx >= 0; widx-- {
|
||||
if watchArgs[widx] == osArgs[oidx] {
|
||||
oidx--
|
||||
continue
|
||||
}
|
||||
if oidx == execIdx { // watchArgs has extra
|
||||
return nil, nil, errBadArgsNumConflictEnv
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if interactive { // "watch" not found
|
||||
return nil, nil, errBadArgsInteractiveWatch
|
||||
}
|
||||
if len(watchArgs) < 1 {
|
||||
if len(watchArgs) < 1 && envKey == "" {
|
||||
return nil, nil, errBadArgsNum
|
||||
}
|
||||
|
||||
@ -192,7 +247,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
if idx < len(osArgs)-1 {
|
||||
osArgs = osArgs[idx+1:]
|
||||
} else {
|
||||
} else if envKey == "" {
|
||||
return nil, nil, errBadArgsNum
|
||||
}
|
||||
|
||||
@ -202,7 +257,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
foundSep := false
|
||||
for idx = range argsWithSep {
|
||||
if argsWithSep[idx] == "--" && idx > 0 {
|
||||
if argsWithSep[idx] == "--" {
|
||||
foundSep = true
|
||||
break
|
||||
}
|
||||
@ -214,6 +269,18 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
watchArgs = flagset.Args()
|
||||
}
|
||||
|
||||
// "watch -- echo hello" with ETCDCTL_WATCH_KEY=foo
|
||||
// should be translated to "watch foo -- echo hello"
|
||||
// (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
|
||||
if envKey != "" {
|
||||
tmp := []string{envKey}
|
||||
if envRange != "" {
|
||||
tmp = append(tmp, envRange)
|
||||
}
|
||||
watchArgs = append(tmp, watchArgs...)
|
||||
}
|
||||
|
||||
if !foundSep {
|
||||
return watchArgs, nil, nil
|
||||
}
|
||||
|
@ -21,9 +21,10 @@ import (
|
||||
|
||||
func Test_parseWatchArgs(t *testing.T) {
|
||||
tt := []struct {
|
||||
osArgs []string // raw arguments to "watch" command
|
||||
commandArgs []string // arguments after "spf13/cobra" preprocessing
|
||||
interactive bool
|
||||
osArgs []string // raw arguments to "watch" command
|
||||
commandArgs []string // arguments after "spf13/cobra" preprocessing
|
||||
envKey, envRange string
|
||||
interactive bool
|
||||
|
||||
watchArgs []string
|
||||
execArgs []string
|
||||
@ -45,9 +46,66 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumSeparator,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
|
||||
commandArgs: []string{"foo", "bar"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
|
||||
commandArgs: []string{"foo", "bar"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
@ -56,6 +114,16 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
@ -117,6 +185,35 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"foo", "bar", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"foo", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -141,6 +238,26 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch"},
|
||||
envKey: "hello world!",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"hello world!", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "--rev", "1"},
|
||||
@ -165,6 +282,25 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -181,6 +317,16 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "--rev", "1", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -199,7 +345,7 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, ts := range tt {
|
||||
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.interactive)
|
||||
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.envKey, ts.envRange, ts.interactive)
|
||||
if err != ts.err {
|
||||
t.Fatalf("#%d: error expected %v, got %v", i, ts.err, err)
|
||||
}
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@ -179,10 +178,6 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
||||
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
if cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -392,6 +387,9 @@ func checkSupportArch() {
|
||||
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
|
||||
return
|
||||
}
|
||||
// unsupported arch only configured via environment variable
|
||||
// so unset here to not parse through flag
|
||||
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
|
||||
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
|
||||
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
|
||||
return
|
||||
|
@ -17,6 +17,7 @@ package etcdmain
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -37,10 +38,12 @@ import (
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/proxy/grpcproxy"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -50,6 +53,8 @@ var (
|
||||
grpcProxyDNSCluster string
|
||||
grpcProxyInsecureDiscovery bool
|
||||
grpcProxyDataDir string
|
||||
grpcMaxCallSendMsgSize int
|
||||
grpcMaxCallRecvMsgSize int
|
||||
|
||||
// tls for connecting to etcd
|
||||
|
||||
@ -75,8 +80,12 @@ var (
|
||||
|
||||
grpcProxyEnablePprof bool
|
||||
grpcProxyEnableOrdering bool
|
||||
|
||||
grpcProxyDebug bool
|
||||
)
|
||||
|
||||
const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(newGRPCProxyCommand())
|
||||
}
|
||||
@ -110,6 +119,8 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
|
||||
cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
|
||||
cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
|
||||
cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
|
||||
cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
|
||||
|
||||
// client TLS for connecting to server
|
||||
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
||||
@ -127,12 +138,26 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
// experimental flags
|
||||
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
|
||||
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
|
||||
|
||||
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
|
||||
|
||||
return &cmd
|
||||
}
|
||||
|
||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
checkArgs()
|
||||
|
||||
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||
if grpcProxyDebug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
|
||||
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
||||
if tlsinfo == nil && grpcProxyListenAutoTLS {
|
||||
host := []string{"https://" + grpcProxyListenAddr}
|
||||
@ -222,6 +247,14 @@ func newClientCfg(eps []string) (*clientv3.Config, error) {
|
||||
Endpoints: eps,
|
||||
DialTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
if grpcMaxCallSendMsgSize > 0 {
|
||||
cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
|
||||
}
|
||||
if grpcMaxCallRecvMsgSize > 0 {
|
||||
cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
|
||||
}
|
||||
|
||||
tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
|
||||
if tls == nil && grpcProxyInsecureSkipTLSVerify {
|
||||
tls = &transport.TLSInfo{}
|
||||
|
@ -58,7 +58,7 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
}
|
||||
h := hfunc()
|
||||
d, _ := json.Marshal(h)
|
||||
if !h.Health {
|
||||
if h.Health != "true" {
|
||||
http.Error(w, string(d), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
@ -70,33 +70,32 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
// Health defines etcd server health status.
|
||||
// TODO: remove manual parsing in etcdctl cluster-health
|
||||
type Health struct {
|
||||
Health bool `json:"health"`
|
||||
Errors []string `json:"errors,omitempty"`
|
||||
Health string `json:"health"`
|
||||
}
|
||||
|
||||
// TODO: server NOSPACE, etcdserver.ErrNoLeader in health API
|
||||
|
||||
func checkHealth(srv etcdserver.ServerV2) Health {
|
||||
h := Health{Health: false}
|
||||
h := Health{Health: "true"}
|
||||
|
||||
as := srv.Alarms()
|
||||
if len(as) > 0 {
|
||||
for _, v := range as {
|
||||
h.Errors = append(h.Errors, v.Alarm.String())
|
||||
h.Health = "false"
|
||||
}
|
||||
|
||||
if h.Health == "true" {
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
h.Health = "false"
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
h.Errors = append(h.Errors, etcdserver.ErrNoLeader.Error())
|
||||
return h
|
||||
if h.Health == "true" {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
|
||||
cancel()
|
||||
if err != nil {
|
||||
h.Health = "false"
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
|
||||
cancel()
|
||||
if err != nil {
|
||||
h.Errors = append(h.Errors, err.Error())
|
||||
}
|
||||
|
||||
h.Health = err == nil
|
||||
return h
|
||||
}
|
||||
|
@ -16,8 +16,10 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -36,9 +38,8 @@ const (
|
||||
maxSendBytes = math.MaxInt32
|
||||
)
|
||||
|
||||
func init() {
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
}
|
||||
// integration tests call this multiple times, which is racey in gRPC side
|
||||
var grpclogOnce sync.Once
|
||||
|
||||
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
|
||||
var opts []grpc.ServerOption
|
||||
@ -70,5 +71,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
|
||||
// set zero values for metrics registered for this grpc server
|
||||
grpc_prometheus.Register(grpcServer)
|
||||
|
||||
grpclogOnce.Do(func() {
|
||||
if s.Cfg.Debug {
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
})
|
||||
|
||||
return grpcServer
|
||||
}
|
||||
|
@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
resp.TTL = ttl
|
||||
err = stream.Send(resp)
|
||||
if err != nil {
|
||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/auth"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
@ -81,3 +82,35 @@ func togRPCError(err error) error {
|
||||
}
|
||||
return grpcErr
|
||||
}
|
||||
|
||||
func isClientCtxErr(ctxErr error, err error) bool {
|
||||
if ctxErr != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ev, ok := status.FromError(err)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
switch ev.Code() {
|
||||
case codes.Canceled, codes.DeadlineExceeded:
|
||||
// client-side context cancel or deadline exceeded
|
||||
// "rpc error: code = Canceled desc = context canceled"
|
||||
// "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
|
||||
return true
|
||||
case codes.Unavailable:
|
||||
msg := ev.Message()
|
||||
// client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
|
||||
// "rpc error: code = Unavailable desc = client disconnected"
|
||||
if msg == "client disconnected" {
|
||||
return true
|
||||
}
|
||||
// "grpc/transport.ClientTransport.CloseStream" on canceled streams
|
||||
// "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
|
||||
if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
// deadlock when calling sws.close().
|
||||
go func() {
|
||||
if rerr := sws.recvLoop(); rerr != nil {
|
||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), rerr) {
|
||||
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
}
|
||||
errc <- rerr
|
||||
}
|
||||
}()
|
||||
@ -339,7 +343,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
|
||||
mvcc.ReportEventReceived(len(evs))
|
||||
if err := sws.gRPCStream.Send(wr); err != nil {
|
||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -356,7 +364,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
}
|
||||
|
||||
if err := sws.gRPCStream.Send(c); err != nil {
|
||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -372,7 +384,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
for _, v := range pending[wid] {
|
||||
mvcc.ReportEventReceived(len(v.Events))
|
||||
if err := sws.gRPCStream.Send(v); err != nil {
|
||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -58,8 +58,8 @@ func openBackend(cfg ServerConfig) backend.Backend {
|
||||
select {
|
||||
case be := <-beOpened:
|
||||
return be
|
||||
case <-time.After(time.Second):
|
||||
plog.Warningf("another etcd process is using %q and holds the file lock.", fn)
|
||||
case <-time.After(10 * time.Second):
|
||||
plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
|
||||
plog.Warningf("waiting for it to exit before starting...")
|
||||
}
|
||||
return <-beOpened
|
||||
|
@ -70,6 +70,8 @@ type ServerConfig struct {
|
||||
// before serving any peer/client traffic.
|
||||
InitialCorruptCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||
@ -122,7 +124,8 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
|
||||
sort.Strings(apurls)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
if netutil.URLStringsEqual(ctx, apurls, urls.StringSlice()) {
|
||||
ok, err := netutil.URLStringsEqual(ctx, apurls, urls.StringSlice())
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -146,7 +149,7 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
|
||||
}
|
||||
mstr := strings.Join(missing, ",")
|
||||
apStr := strings.Join(apurls, ",")
|
||||
return fmt.Errorf("--initial-cluster has %s but missing from --initial-advertise-peer-urls=%s ", mstr, apStr)
|
||||
return fmt.Errorf("--initial-cluster has %s but missing from --initial-advertise-peer-urls=%s (%v)", mstr, apStr, err)
|
||||
}
|
||||
|
||||
for url := range apMap {
|
||||
@ -154,9 +157,16 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
|
||||
missing = append(missing, url)
|
||||
}
|
||||
}
|
||||
mstr := strings.Join(missing, ",")
|
||||
if len(missing) > 0 {
|
||||
mstr := strings.Join(missing, ",")
|
||||
umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
|
||||
return fmt.Errorf("--initial-advertise-peer-urls has %s but missing from --initial-cluster=%s", mstr, umap.String())
|
||||
}
|
||||
|
||||
// resolved URLs from "--initial-advertise-peer-urls" and "--initial-cluster" did not match or failed
|
||||
apStr := strings.Join(apurls, ",")
|
||||
umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
|
||||
return fmt.Errorf("--initial-advertise-peer-urls has %s but missing from --initial-cluster=%s", mstr, umap.String())
|
||||
return fmt.Errorf("failed to resolve %s to match --initial-cluster=%s (%v)", apStr, umap.String(), err)
|
||||
}
|
||||
|
||||
func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
|
||||
|
@ -490,8 +490,8 @@ func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) erro
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
for i := range ems {
|
||||
if !netutil.URLStringsEqual(ctx, ems[i].PeerURLs, lms[i].PeerURLs) {
|
||||
return fmt.Errorf("unmatched member while checking PeerURLs")
|
||||
if ok, err := netutil.URLStringsEqual(ctx, ems[i].PeerURLs, lms[i].PeerURLs); !ok {
|
||||
return fmt.Errorf("unmatched member while checking PeerURLs (%v)", err)
|
||||
}
|
||||
lms[i].ID = ems[i].ID
|
||||
}
|
||||
|
@ -58,10 +58,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
tickDuration = 10 * time.Millisecond
|
||||
clusterName = "etcd"
|
||||
requestTimeout = 20 * time.Second
|
||||
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
|
||||
RequestWaitTimeout = 3 * time.Second
|
||||
tickDuration = 10 * time.Millisecond
|
||||
requestTimeout = 20 * time.Second
|
||||
|
||||
clusterName = "etcd"
|
||||
basePort = 21000
|
||||
UrlScheme = "unix"
|
||||
UrlSchemeTLS = "unixs"
|
||||
|
@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
}
|
||||
|
||||
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
if berr != nil {
|
||||
return berr
|
||||
}
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
|
||||
b.ForEach(func(k, v []byte) error {
|
||||
count++
|
||||
|
@ -33,7 +33,6 @@ type ExpectProcess struct {
|
||||
fpty *os.File
|
||||
wg sync.WaitGroup
|
||||
|
||||
ptyMu sync.Mutex // protects accessing fpty
|
||||
cond *sync.Cond // for broadcasting updates are available
|
||||
mu sync.Mutex // protects lines and err
|
||||
lines []string
|
||||
@ -76,9 +75,7 @@ func (ep *ExpectProcess) read() {
|
||||
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
|
||||
r := bufio.NewReader(ep.fpty)
|
||||
for ep.err == nil {
|
||||
ep.ptyMu.Lock()
|
||||
l, rerr := r.ReadString('\n')
|
||||
ep.ptyMu.Unlock()
|
||||
ep.mu.Lock()
|
||||
ep.err = rerr
|
||||
if l != "" {
|
||||
@ -150,9 +147,7 @@ func (ep *ExpectProcess) close(kill bool) error {
|
||||
}
|
||||
|
||||
err := ep.cmd.Wait()
|
||||
ep.ptyMu.Lock()
|
||||
ep.fpty.Close()
|
||||
ep.ptyMu.Unlock()
|
||||
ep.wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
|
@ -17,6 +17,7 @@ package netutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
@ -73,14 +74,14 @@ func resolveTCPAddrs(ctx context.Context, urls [][]url.URL) ([][]url.URL, error)
|
||||
for i, u := range us {
|
||||
nu, err := url.Parse(u.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to parse %q (%v)", u.String(), err)
|
||||
}
|
||||
nus[i] = *nu
|
||||
}
|
||||
for i, u := range nus {
|
||||
h, err := resolveURL(ctx, u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to resolve %q (%v)", u.String(), err)
|
||||
}
|
||||
if h != "" {
|
||||
nus[i].Host = h
|
||||
@ -123,35 +124,41 @@ func resolveURL(ctx context.Context, u url.URL) (string, error) {
|
||||
|
||||
// urlsEqual checks equality of url.URLS between two arrays.
|
||||
// This check pass even if an URL is in hostname and opposite is in IP address.
|
||||
func urlsEqual(ctx context.Context, a []url.URL, b []url.URL) bool {
|
||||
func urlsEqual(ctx context.Context, a []url.URL, b []url.URL) (bool, error) {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
return false, fmt.Errorf("len(%q) != len(%q)", urlsToStrings(a), urlsToStrings(b))
|
||||
}
|
||||
urls, err := resolveTCPAddrs(ctx, [][]url.URL{a, b})
|
||||
if err != nil {
|
||||
return false
|
||||
return false, err
|
||||
}
|
||||
preva, prevb := a, b
|
||||
a, b = urls[0], urls[1]
|
||||
sort.Sort(types.URLs(a))
|
||||
sort.Sort(types.URLs(b))
|
||||
for i := range a {
|
||||
if !reflect.DeepEqual(a[i], b[i]) {
|
||||
return false
|
||||
return false, fmt.Errorf("%q(resolved from %q) != %q(resolved from %q)",
|
||||
a[i].String(), preva[i].String(),
|
||||
b[i].String(), prevb[i].String(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func URLStringsEqual(ctx context.Context, a []string, b []string) bool {
|
||||
// URLStringsEqual returns "true" if given URLs are valid
|
||||
// and resolved to same IP addresses. Otherwise, return "false"
|
||||
// and error, if any.
|
||||
func URLStringsEqual(ctx context.Context, a []string, b []string) (bool, error) {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
return false, fmt.Errorf("len(%q) != len(%q)", a, b)
|
||||
}
|
||||
urlsA := make([]url.URL, 0)
|
||||
for _, str := range a {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false
|
||||
return false, fmt.Errorf("failed to parse %q", str)
|
||||
}
|
||||
urlsA = append(urlsA, *u)
|
||||
}
|
||||
@ -159,14 +166,21 @@ func URLStringsEqual(ctx context.Context, a []string, b []string) bool {
|
||||
for _, str := range b {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false
|
||||
return false, fmt.Errorf("failed to parse %q", str)
|
||||
}
|
||||
urlsB = append(urlsB, *u)
|
||||
}
|
||||
|
||||
return urlsEqual(ctx, urlsA, urlsB)
|
||||
}
|
||||
|
||||
func urlsToStrings(us []url.URL) []string {
|
||||
rs := make([]string, len(us))
|
||||
for i := range us {
|
||||
rs[i] = us[i].String()
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
func IsNetworkTimeoutError(err error) bool {
|
||||
nerr, ok := err.(net.Error)
|
||||
return ok && nerr.Timeout()
|
||||
|
@ -167,6 +167,7 @@ func TestURLsEqual(t *testing.T) {
|
||||
a []url.URL
|
||||
b []url.URL
|
||||
expect bool
|
||||
err error
|
||||
}{
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
@ -182,11 +183,13 @@ func TestURLsEqual(t *testing.T) {
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "https", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "https://10.0.10.1:2379"(resolved from "https://10.0.10.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "https", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"https://10.0.10.1:2379"(resolved from "https://example.com:2379") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "unix", Host: "abc:2379"}},
|
||||
@ -212,46 +215,55 @@ func TestURLsEqual(t *testing.T) {
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2380"(resolved from "http://example.com:2380") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`len(["http://10.0.0.1:2379"]) != len(["http://10.0.0.1:2379" "http://127.0.0.1:2380"])`),
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
|
||||
@ -265,16 +277,24 @@ func TestURLsEqual(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
result := urlsEqual(context.TODO(), test.a, test.b)
|
||||
for i, test := range tests {
|
||||
result, err := urlsEqual(context.TODO(), test.a, test.b)
|
||||
if result != test.expect {
|
||||
t.Errorf("a:%v b:%v, expected %v but %v", test.a, test.b, test.expect, result)
|
||||
t.Errorf("#%d: a:%v b:%v, expected %v but %v", i, test.a, test.b, test.expect, result)
|
||||
}
|
||||
if test.err != nil {
|
||||
if err.Error() != test.err.Error() {
|
||||
t.Errorf("#%d: err expected %v but %v", i, test.err, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func TestURLStringsEqual(t *testing.T) {
|
||||
result := URLStringsEqual(context.TODO(), []string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
|
||||
result, err := URLStringsEqual(context.TODO(), []string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
|
||||
if !result {
|
||||
t.Errorf("unexpected result %v", result)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -30,13 +30,12 @@ func HandleHealth(mux *http.ServeMux, c *clientv3.Client) {
|
||||
}
|
||||
|
||||
func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
h := etcdhttp.Health{Health: false}
|
||||
h := etcdhttp.Health{Health: "false"}
|
||||
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
|
||||
_, err := c.Get(ctx, "a")
|
||||
cancel()
|
||||
h.Health = err == nil || err == rpctypes.ErrPermissionDenied
|
||||
if !h.Health {
|
||||
h.Errors = append(h.Errors, err.Error())
|
||||
if err == nil || err == rpctypes.ErrPermissionDenied {
|
||||
h.Health = "true"
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
7
test
7
test
@ -133,6 +133,9 @@ function functional_pass {
|
||||
-peer-ports 12380,22380,32380 \
|
||||
-limit 1 \
|
||||
-schedule-cases "0 1 2 3 4 5" \
|
||||
-stress-qps 1000 \
|
||||
-stress-key-txn-count 100 \
|
||||
-stress-key-txn-ops 10 \
|
||||
-exit-on-failure && echo "'etcd-tester' succeeded"
|
||||
ETCD_TESTER_EXIT_CODE=$?
|
||||
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
||||
@ -246,13 +249,13 @@ function grpcproxy_pass {
|
||||
function release_pass {
|
||||
rm -f ./bin/etcd-last-release
|
||||
# to grab latest patch release; bump this up for every minor release
|
||||
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.2.*" | head -1)
|
||||
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
|
||||
if [ -n "$MANUAL_VER" ]; then
|
||||
# in case, we need to test against different version
|
||||
UPGRADE_VER=$MANUAL_VER
|
||||
fi
|
||||
if [[ -z ${UPGRADE_VER} ]]; then
|
||||
UPGRADE_VER="v3.2.0"
|
||||
UPGRADE_VER="v3.3.0"
|
||||
echo "fallback to" ${UPGRADE_VER}
|
||||
fi
|
||||
|
||||
|
@ -34,9 +34,11 @@ import (
|
||||
type keyStresser struct {
|
||||
Endpoint string
|
||||
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
N int
|
||||
|
||||
@ -77,6 +79,14 @@ func (s *keyStresser) Stress() error {
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
if s.keyTxnSuffixRange > 0 {
|
||||
// adjust to make up ±70% of workloads with writes
|
||||
stressEntries[0].weight = 0.35
|
||||
stressEntries = append(stressEntries, stressEntry{
|
||||
weight: 0.35,
|
||||
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
|
||||
})
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
@ -202,6 +212,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
|
||||
keys := make([]string, keyTxnSuffixRange)
|
||||
for i := range keys {
|
||||
keys[i] = fmt.Sprintf("/k%03d", i)
|
||||
}
|
||||
return writeTxn(kvc, keys, txnOps)
|
||||
}
|
||||
|
||||
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
ks := make(map[string]struct{}, txnOps)
|
||||
for len(ks) != txnOps {
|
||||
ks[keys[rand.Intn(len(keys))]] = struct{}{}
|
||||
}
|
||||
selected := make([]string, 0, txnOps)
|
||||
for k := range ks {
|
||||
selected = append(selected, k)
|
||||
}
|
||||
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
|
||||
txnReq := &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
}
|
||||
|
||||
// add nested txns if any
|
||||
for i := 1; i < txnOps; i++ {
|
||||
k, v := selected[i], fmt.Sprintf("bar%02d", i)
|
||||
com, delOp, putOp = getTxnReqs(k, v)
|
||||
nested := &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestTxn{
|
||||
RequestTxn: &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
},
|
||||
},
|
||||
}
|
||||
txnReq.Success = append(txnReq.Success, nested)
|
||||
txnReq.Failure = append(txnReq.Failure, nested)
|
||||
}
|
||||
|
||||
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
|
||||
return err, int64(txnOps)
|
||||
}
|
||||
}
|
||||
|
||||
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
|
||||
// if key exists (version > 0)
|
||||
com = &pb.Compare{
|
||||
Key: []byte(key),
|
||||
Target: pb.Compare_VERSION,
|
||||
Result: pb.Compare_GREATER,
|
||||
TargetUnion: &pb.Compare_Version{Version: 0},
|
||||
}
|
||||
delOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestDeleteRange{
|
||||
RequestDeleteRange: &pb.DeleteRangeRequest{
|
||||
Key: []byte(key),
|
||||
},
|
||||
},
|
||||
}
|
||||
putOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestPut{
|
||||
RequestPut: &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte(val),
|
||||
},
|
||||
},
|
||||
}
|
||||
return com, delOp, putOp
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
|
@ -47,6 +47,8 @@ func main() {
|
||||
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
|
||||
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
|
||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
|
||||
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
|
||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
|
||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||
@ -120,15 +122,23 @@ func main() {
|
||||
}
|
||||
|
||||
scfg := stressConfig{
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
|
||||
keyTxnOps: int(*stressKeyTxnOps),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
|
||||
etcdRunnerPath: *etcdRunnerPath,
|
||||
}
|
||||
if scfg.keyTxnSuffixRange > 100 {
|
||||
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
|
||||
}
|
||||
if scfg.keyTxnOps > 64 {
|
||||
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
|
||||
}
|
||||
|
||||
t := &tester{
|
||||
failures: schedule,
|
||||
|
@ -16,17 +16,13 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
|
||||
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
Stress() error
|
||||
@ -117,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
|
||||
}
|
||||
|
||||
type stressConfig struct {
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
@ -146,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
|
||||
// TODO: Too intensive stressers can panic etcd member with
|
||||
// 'out of memory' error. Put rate limits in server side.
|
||||
return &keyStresser{
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
keyTxnSuffixRange: sc.keyTxnSuffixRange,
|
||||
keyTxnOps: sc.keyTxnOps,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "v2keys":
|
||||
return &v2Stresser{
|
||||
|
Reference in New Issue
Block a user