Compare commits
44 Commits
dependabot
...
v3.3.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
374dc5743f | |||
55505617df | |||
a9317d3d77 | |||
02d362ccde | |||
d292337d14 | |||
7974f008f3 | |||
4a3f99415e | |||
6340564c84 | |||
6735028ec0 | |||
906f098053 | |||
8a66237693 | |||
d37afffb98 | |||
7e2759da8d | |||
ad4df985fc | |||
2df89c8bf6 | |||
6178c45066 | |||
9ccae0f81a | |||
a5079cc381 | |||
9e079d8f02 | |||
bd57c9ca5b | |||
58c402a47b | |||
3ce73b70bc | |||
ee3c81d8d3 | |||
2dfabfbef6 | |||
bf83d5269f | |||
a609b1eb47 | |||
1ae0c0b47d | |||
ec43197344 | |||
70ba0518f1 | |||
e330f5004f | |||
0ec5023b7b | |||
0f69520622 | |||
d3c2acf090 | |||
5e35f79087 | |||
6dff1a9398 | |||
325913d6fb | |||
24c9fb0527 | |||
8511db5e2b | |||
3193f3c9ab | |||
bdc508cadf | |||
d5a0609412 | |||
67af1a2138 | |||
66d68a8fdb | |||
ebaa83c985 |
@ -2,7 +2,7 @@
|
||||
|
||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
||||
|
||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.2.11"
|
||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0-rc.0"
|
||||
if [ "$TEST_ARCH" == "386" ]; then
|
||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
||||
fi
|
||||
|
@ -1,8 +1,4 @@
|
||||
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??)
|
||||
|
||||
**v3.3.0 is not yet released; expected to be released in January 2018.**
|
||||
|
||||
## [v3.3.0-rc.0](https://github.com/coreos/etcd/releases/tag/v3.3.0-rc.0) (2017-12-20)
|
||||
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0)
|
||||
|
||||
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.
|
||||
|
||||
|
@ -87,7 +87,7 @@ Removing excessive keyspace data and defragmenting the backend database will put
|
||||
|
||||
```sh
|
||||
# get current revision
|
||||
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9]*')
|
||||
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9].*')
|
||||
# compact away all old revisions
|
||||
$ ETCDCTL_API=3 etcdctl compact $rev
|
||||
compacted revision 1516
|
||||
|
@ -197,7 +197,7 @@ When client authentication is enabled for an etcd member, the administrator must
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [TLS certificates get reloaded on every client connection](https://github.com/coreos/etcd/pull/7829). This is useful when replacing expiry certs without stopping etcd servers; it can be done by overwriting old certs with new ones. Refreshing certs for every connection should not have too much overhead, but can be improved in the future, with caching layer. Example tests can be found [here](https://github.com/coreos/etcd/blob/b041ce5d514a4b4aaeefbffb008f0c7570a18986/integration/v3_grpc_test.go#L1601-L1757).
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains any IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
@ -223,16 +223,14 @@ Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017
|
||||
|
||||
when peer B's actual IP address is `10.138.0.2`, not `10.138.0.27`. When peer B tries to join the cluster, peer A will reject B with the error `x509: certificate is valid for 10.138.0.27, not 10.138.0.2`, because B's remote IP address does not match the one in Subject Alternative Name (SAN) field.
|
||||
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains any DNS names in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups (`dig b.com`) on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"b.com"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A looks up the incoming host `b.com` to get the list of IP addresses (e.g. `dig b.com`). And rejects B if the list does not contain the IP `10.138.0.2`, with the error `tls: 10.138.0.2 does not match any of DNSNames ["b.com"]`.
|
||||
@ -241,13 +239,11 @@ Since [v3.2.2](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v322-2017
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"invalid.domain",
|
||||
"10.138.0.2"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2` and `invalid.domain` is a invalid host. When peer B tries to join the cluster, peer A successfully authenticates B, since Subject Alternative Name (SAN) field has a valid matching IP address. See [issue#8206](https://github.com/coreos/etcd/issues/8206) for more detail.
|
||||
@ -256,13 +252,11 @@ Since [v3.2.5](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v325-2017
|
||||
|
||||
```json
|
||||
{
|
||||
...
|
||||
"CN": "etcd peer",
|
||||
"hosts": [
|
||||
"*.example.default.svc",
|
||||
"*.example.default.svc.cluster.local"
|
||||
],
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A reverse-lookup the IP `10.138.0.2` to get the list of host names. And either exact or wildcard match the host names with peer B's cert DNS names in Subject Alternative Name (SAN) field. If none of reverse/forward lookups worked, it returns an error `"tls: "10.138.0.2" does not match any of DNSNames ["*.example.default.svc","*.example.default.svc.cluster.local"]`. See [issue#8268](https://github.com/coreos/etcd/issues/8268) for more detail.
|
||||
|
@ -74,23 +74,23 @@ Set `embed.Config.Debug` field to `true` to enable gRPC server logs.
|
||||
|
||||
#### Change in `/health` endpoint response value
|
||||
|
||||
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 instead defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct and returns properly encoded JSON value with errors, if any.
|
||||
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 now defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct and includes errors, if any.
|
||||
|
||||
Before
|
||||
|
||||
```bash
|
||||
$ curl http://localhost:2379/health
|
||||
{"health": "true"}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
||||
After
|
||||
|
||||
```bash
|
||||
$ curl http://localhost:2379/health
|
||||
{"health":true}
|
||||
{"health":"true"}
|
||||
|
||||
# Or
|
||||
{"health":false,"errors":["NOSPACE"]}
|
||||
{"health":"false","errors":["NOSPACE"]}
|
||||
```
|
||||
|
||||
#### Change in gRPC gateway HTTP endpoints (replaced `/v3alpha` with `/v3beta`)
|
||||
|
@ -45,12 +45,12 @@ It is important to monitor your production etcd cluster for healthy information
|
||||
|
||||
#### Health Monitoring
|
||||
|
||||
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":true}`, then the cluster is healthy.
|
||||
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":"true"}`, then the cluster is healthy.
|
||||
|
||||
```
|
||||
$ curl -L http://127.0.0.1:2379/health
|
||||
|
||||
{"health":true}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
||||
You can also use etcdctl to check the cluster-wide health information. It will contact all the members of the cluster and collect the health information for you.
|
||||
|
@ -29,5 +29,5 @@ curl http://10.0.0.10:2379/health
|
||||
```
|
||||
|
||||
```json
|
||||
{"health":true}
|
||||
{"health":"true"}
|
||||
```
|
||||
|
@ -54,7 +54,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
|
||||
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
|
||||
// dial for unhealthy endpoint.
|
||||
// then we can reduce 3s to 1s.
|
||||
timeout := pingInterval + 3*time.Second
|
||||
timeout := pingInterval + integration.RequestWaitTimeout
|
||||
|
||||
cli, err := clientv3.New(ccfg)
|
||||
if err != nil {
|
||||
|
@ -121,7 +121,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
|
||||
if !setBefore {
|
||||
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
|
||||
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -453,7 +453,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
|
||||
clus.TakeClient(0)
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("kv.Get took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -480,7 +480,7 @@ func TestKVNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("kv.Get took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -906,7 +906,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||
maxCallRecvBytesClient: 0,
|
||||
valueSize: 10 * 1024 * 1024,
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||
},
|
||||
{
|
||||
maxRequestBytesServer: 10 * 1024 * 1024,
|
||||
@ -920,7 +920,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||
maxCallRecvBytesClient: 0,
|
||||
valueSize: 10*1024*1024 + 5,
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
|
||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
@ -939,7 +939,7 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
if err != test.expectError {
|
||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
} else if err != nil && err.Error() != test.expectError.Error() {
|
||||
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
|
||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||
}
|
||||
|
||||
|
@ -299,7 +299,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Grant took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -325,7 +325,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Grant took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -357,7 +357,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("le.Revoke took too long")
|
||||
case <-donec:
|
||||
}
|
||||
|
@ -234,7 +234,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
||||
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
|
||||
select {
|
||||
case <-wch:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("took too long to create watch")
|
||||
}
|
||||
|
||||
@ -252,7 +252,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
||||
if err = ev.Err(); err != rpctypes.ErrNoLeader {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
|
||||
}
|
||||
case <-time.After(3 * time.Second): // enough time to detect leader lost
|
||||
case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
|
||||
t.Fatal("took too long to detect leader lost")
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
|
||||
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
|
||||
select {
|
||||
case <-wch:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("took too long to create watch")
|
||||
}
|
||||
|
||||
@ -348,7 +348,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
|
||||
clus.Members[target].Restart(t)
|
||||
|
||||
select {
|
||||
case <-time.After(clientTimeout + 3*time.Second):
|
||||
case <-time.After(clientTimeout + integration.RequestWaitTimeout):
|
||||
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
|
||||
case <-donec:
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) {
|
||||
clus.TakeClient(0)
|
||||
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("wc.Watch took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) {
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("wc.Watch took too long")
|
||||
case <-donec:
|
||||
}
|
||||
@ -751,7 +751,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||
if resp.Err() != rpctypes.ErrNoLeader {
|
||||
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("watch without leader took too long to close")
|
||||
}
|
||||
|
||||
@ -760,7 +760,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||
if ok {
|
||||
t.Fatalf("expected closed channel, got response %v", resp)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(integration.RequestWaitTimeout):
|
||||
t.Fatal("waited too long for channel to close")
|
||||
}
|
||||
|
||||
|
@ -445,8 +445,11 @@ func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue
|
||||
}
|
||||
|
||||
func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
||||
lkv.leases.mu.RLock()
|
||||
sessionc := lkv.sessionc
|
||||
lkv.leases.mu.RUnlock()
|
||||
select {
|
||||
case <-lkv.sessionc:
|
||||
case <-sessionc:
|
||||
return nil
|
||||
case <-lkv.ctx.Done():
|
||||
return lkv.ctx.Err()
|
||||
|
@ -53,7 +53,7 @@ func alarmTest(cx ctlCtx) {
|
||||
}
|
||||
|
||||
// '/health' handler should return 'false'
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":false,"errors":["NOSPACE"]}`}); err != nil {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"false"}`}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
@ -45,55 +46,94 @@ type kvExec struct {
|
||||
|
||||
func watchTest(cx ctlCtx) {
|
||||
tests := []struct {
|
||||
puts []kv
|
||||
args []string
|
||||
puts []kv
|
||||
envKey string
|
||||
envRange string
|
||||
args []string
|
||||
|
||||
wkv []kvExec
|
||||
}{
|
||||
{ // watch 1 key
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1"},
|
||||
[]kvExec{{key: "sample", val: "value"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1"},
|
||||
wkv: []kvExec{{key: "sample", val: "value"}},
|
||||
},
|
||||
{ // watch 1 key with env
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
args: []string{"--rev", "1"},
|
||||
wkv: []kvExec{{key: "sample", val: "value"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with ${ETCD_WATCH_VALUE}
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "--", "env"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: `ETCD_WATCH_VALUE="value"`}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received", with env
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
args: []string{"--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"--rev", "1", "sample", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"--rev", "1", "sample", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo \"Hello World!\""
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
[]kv{{"sample", "value"}},
|
||||
[]string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
|
||||
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
puts: []kv{{"sample", "value"}},
|
||||
envKey: "sample",
|
||||
envRange: "samplx",
|
||||
args: []string{"--rev", "1", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 1 key with "echo watch event received"
|
||||
puts: []kv{{"sample", "value"}},
|
||||
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
|
||||
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
|
||||
},
|
||||
{ // watch 3 keys by prefix
|
||||
[]kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
[]string{"key", "--rev", "1", "--prefix"},
|
||||
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
args: []string{"key", "--rev", "1", "--prefix"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
},
|
||||
{ // watch 3 keys by prefix, with env
|
||||
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
|
||||
envKey: "key",
|
||||
args: []string{"--rev", "1", "--prefix"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
|
||||
},
|
||||
{ // watch by revision
|
||||
[]kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
|
||||
[]string{"etcd", "--rev", "2"},
|
||||
[]kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
|
||||
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
|
||||
args: []string{"etcd", "--rev", "2"},
|
||||
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
|
||||
},
|
||||
{ // watch 3 keys by range
|
||||
[]kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
[]string{"key", "key3", "--rev", "1"},
|
||||
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
args: []string{"key", "key3", "--rev", "1"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
},
|
||||
{ // watch 3 keys by range, with env
|
||||
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
|
||||
envKey: "key",
|
||||
envRange: "key3",
|
||||
args: []string{"--rev", "1"},
|
||||
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
|
||||
},
|
||||
}
|
||||
|
||||
@ -107,11 +147,30 @@ func watchTest(cx ctlCtx) {
|
||||
}
|
||||
close(donec)
|
||||
}(i, tt.puts)
|
||||
|
||||
unsetEnv := func() {}
|
||||
if tt.envKey != "" || tt.envRange != "" {
|
||||
if tt.envKey != "" {
|
||||
os.Setenv("ETCDCTL_WATCH_KEY", tt.envKey)
|
||||
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_KEY") }
|
||||
}
|
||||
if tt.envRange != "" {
|
||||
os.Setenv("ETCDCTL_WATCH_RANGE_END", tt.envRange)
|
||||
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_RANGE_END") }
|
||||
}
|
||||
if tt.envKey != "" && tt.envRange != "" {
|
||||
unsetEnv = func() {
|
||||
os.Unsetenv("ETCDCTL_WATCH_KEY")
|
||||
os.Unsetenv("ETCDCTL_WATCH_RANGE_END")
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := ctlV3Watch(cx, tt.args, tt.wkv...); err != nil {
|
||||
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
|
||||
cx.t.Errorf("watchTest #%d: ctlV3Watch error (%v)", i, err)
|
||||
}
|
||||
}
|
||||
unsetEnv()
|
||||
<-donec
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ func metricsTest(cx ctlCtx) {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":true}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"true"}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
|
||||
cx.t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
|
@ -268,8 +268,11 @@ func (cfg *Config) SetupLogging() {
|
||||
if cfg.Debug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
if cfg.LogPkgLevels != "" {
|
||||
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -179,6 +180,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
AuthToken: cfg.AuthToken,
|
||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||
Debug: cfg.Debug,
|
||||
}
|
||||
|
||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||
@ -522,6 +524,10 @@ func (e *Etcd) serveClients() (err error) {
|
||||
}
|
||||
|
||||
func (e *Etcd) serveMetrics() (err error) {
|
||||
if e.cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
|
||||
if len(e.cfg.ListenMetricsUrls) > 0 {
|
||||
metricsMux := http.NewServeMux()
|
||||
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
|
||||
|
@ -378,6 +378,13 @@ watch [options] <key or prefix>\n
|
||||
# bar
|
||||
```
|
||||
|
||||
```bash
|
||||
ETCDCTL_WATCH_KEY=foo ./etcdctl watch
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
```
|
||||
|
||||
Receive events and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
@ -388,6 +395,41 @@ Receive events and execute `echo watch event received`:
|
||||
# watch event received
|
||||
```
|
||||
|
||||
Watch response is set via `ETCD_WATCH_*` environmental variables:
|
||||
|
||||
```bash
|
||||
./etcdctl watch foo -- sh -c "env | grep ETCD_WATCH_"
|
||||
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# ETCD_WATCH_REVISION=11
|
||||
# ETCD_WATCH_KEY="foo"
|
||||
# ETCD_WATCH_EVENT_TYPE="PUT"
|
||||
# ETCD_WATCH_VALUE="bar"
|
||||
```
|
||||
|
||||
Watch with environmental variables and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
./etcdctl watch -- echo watch event received
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
export ETCDCTL_WATCH_RANGE_END=foox
|
||||
./etcdctl watch -- echo watch event received
|
||||
# PUT
|
||||
# fob
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
##### Interactive
|
||||
|
||||
```bash
|
||||
@ -413,6 +455,29 @@ watch foo -- echo watch event received
|
||||
# watch event received
|
||||
```
|
||||
|
||||
Watch with environmental variables and execute `echo watch event received`:
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
./etcdctl watch -i
|
||||
watch -- echo watch event received
|
||||
# PUT
|
||||
# foo
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
```bash
|
||||
export ETCDCTL_WATCH_KEY=foo
|
||||
export ETCDCTL_WATCH_RANGE_END=foox
|
||||
./etcdctl watch -i
|
||||
watch -- echo watch event received
|
||||
# PUT
|
||||
# fob
|
||||
# bar
|
||||
# watch event received
|
||||
```
|
||||
|
||||
### LEASE \<subcommand\>
|
||||
|
||||
LEASE provides commands for key lease management.
|
||||
@ -874,6 +939,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman
|
||||
|
||||
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
|
||||
|
||||
- wal-dir -- Path to the WAL directory. Uses data directory if none given.
|
||||
|
||||
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
|
||||
|
||||
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.
|
||||
|
@ -202,7 +202,26 @@ func endpointsFromCluster(cmd *cobra.Command) []string {
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
c := mustClientFromCmd(cmd)
|
||||
|
||||
sec := secureCfgFromCmd(cmd)
|
||||
dt := dialTimeoutFromCmd(cmd)
|
||||
ka := keepAliveTimeFromCmd(cmd)
|
||||
kat := keepAliveTimeoutFromCmd(cmd)
|
||||
eps, err := endpointsFromCmd(cmd)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
// exclude auth for not asking needless password (MemberList() doesn't need authentication)
|
||||
|
||||
cfg, err := newClientCfg(eps, dt, ka, kat, sec, nil)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
c, err := v3.New(*cfg)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
defer func() {
|
||||
c.Close()
|
||||
|
@ -101,8 +101,19 @@ type clientConfig struct {
|
||||
acfg *authCfg
|
||||
}
|
||||
|
||||
type discardValue struct{}
|
||||
|
||||
func (*discardValue) String() string { return "" }
|
||||
func (*discardValue) Set(string) error { return nil }
|
||||
func (*discardValue) Type() string { return "" }
|
||||
|
||||
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
|
||||
fs := cmd.InheritedFlags()
|
||||
|
||||
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_KEY=foo" warnings
|
||||
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_RANGE_END=bar" warnings
|
||||
fs.AddFlag(&pflag.Flag{Name: "watch-key", Value: &discardValue{}})
|
||||
fs.AddFlag(&pflag.Flag{Name: "watch-range-end", Value: &discardValue{}})
|
||||
flags.SetPflagsFromEnv("ETCDCTL", fs)
|
||||
|
||||
debug, err := cmd.Flags().GetBool("debug")
|
||||
|
@ -56,6 +56,7 @@ var (
|
||||
restoreCluster string
|
||||
restoreClusterToken string
|
||||
restoreDataDir string
|
||||
restoreWalDir string
|
||||
restorePeerURLs string
|
||||
restoreName string
|
||||
skipHashCheck bool
|
||||
@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
|
||||
Run: snapshotRestoreCommandFunc,
|
||||
}
|
||||
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
|
||||
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
|
||||
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
|
||||
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
|
||||
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
|
||||
@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
||||
basedir = restoreName + ".etcd"
|
||||
}
|
||||
|
||||
waldir := filepath.Join(basedir, "member", "wal")
|
||||
waldir := restoreWalDir
|
||||
if waldir == "" {
|
||||
waldir = filepath.Join(basedir, "member", "wal")
|
||||
}
|
||||
snapdir := filepath.Join(basedir, "member", "snap")
|
||||
|
||||
if _, err := os.Stat(basedir); err == nil {
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
var (
|
||||
errBadArgsNum = errors.New("bad number of arguments")
|
||||
errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
|
||||
errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
|
||||
errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
|
||||
)
|
||||
@ -59,12 +60,17 @@ func NewWatchCommand() *cobra.Command {
|
||||
|
||||
// watchCommandFunc executes the "watch" command.
|
||||
func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
|
||||
if envKey == "" && envRange != "" {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
|
||||
}
|
||||
|
||||
if watchInteractive {
|
||||
watchInteractiveFunc(cmd, os.Args)
|
||||
watchInteractiveFunc(cmd, os.Args, envKey, envRange)
|
||||
return
|
||||
}
|
||||
|
||||
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, false)
|
||||
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
|
||||
if err != nil {
|
||||
ExitWithError(ExitBadArgs, err)
|
||||
}
|
||||
@ -82,7 +88,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
}
|
||||
|
||||
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
|
||||
c := mustClientFromCmd(cmd)
|
||||
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
@ -95,7 +101,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
l = strings.TrimSuffix(l, "\n")
|
||||
|
||||
args := argify(l)
|
||||
if len(args) < 2 {
|
||||
if len(args) < 2 && envKey == "" {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
|
||||
continue
|
||||
}
|
||||
@ -105,7 +111,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
|
||||
continue
|
||||
}
|
||||
|
||||
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, true)
|
||||
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
|
||||
if perr != nil {
|
||||
ExitWithError(ExitBadArgs, perr)
|
||||
}
|
||||
@ -149,11 +155,18 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
|
||||
display.Watch(resp)
|
||||
|
||||
if len(execArgs) > 0 {
|
||||
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
|
||||
for _, ev := range resp.Events {
|
||||
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
|
||||
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -165,7 +178,7 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
|
||||
// (e.g. ./bin/etcdctl watch foo --rev 1 bar).
|
||||
// "--" characters are invalid arguments for "spf13/cobra" library,
|
||||
// so no need to handle such cases.
|
||||
func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs []string, execArgs []string, err error) {
|
||||
func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
|
||||
watchArgs = commandArgs
|
||||
|
||||
// remove preceding commands (e.g. "watch foo bar" in interactive mode)
|
||||
@ -175,12 +188,54 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
break
|
||||
}
|
||||
}
|
||||
if idx < len(watchArgs)-1 {
|
||||
watchArgs = watchArgs[idx+1:]
|
||||
if idx < len(watchArgs)-1 || envKey != "" {
|
||||
if idx < len(watchArgs)-1 {
|
||||
watchArgs = watchArgs[idx+1:]
|
||||
}
|
||||
|
||||
execIdx, execExist := 0, false
|
||||
for execIdx = range osArgs {
|
||||
v := osArgs[execIdx]
|
||||
if v == "--" && execIdx != len(osArgs)-1 {
|
||||
execExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if idx == len(watchArgs)-1 && envKey != "" {
|
||||
if len(watchArgs) > 0 && !interactive {
|
||||
// "watch --rev 1 -- echo Hello World" has no conflict
|
||||
if !execExist {
|
||||
// "watch foo" with ETCDCTL_WATCH_KEY=foo
|
||||
// (watchArgs==["foo"])
|
||||
return nil, nil, errBadArgsNumConflictEnv
|
||||
}
|
||||
}
|
||||
// otherwise, watch with no argument and environment key is set
|
||||
// if interactive, first "watch" command string should be removed
|
||||
if interactive {
|
||||
watchArgs = []string{}
|
||||
}
|
||||
}
|
||||
|
||||
// "watch foo -- echo hello" with ETCDCTL_WATCH_KEY=foo
|
||||
// (watchArgs==["foo","echo","hello"])
|
||||
if envKey != "" && execExist {
|
||||
widx, oidx := 0, len(osArgs)-1
|
||||
for widx = len(watchArgs) - 1; widx >= 0; widx-- {
|
||||
if watchArgs[widx] == osArgs[oidx] {
|
||||
oidx--
|
||||
continue
|
||||
}
|
||||
if oidx == execIdx { // watchArgs has extra
|
||||
return nil, nil, errBadArgsNumConflictEnv
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if interactive { // "watch" not found
|
||||
return nil, nil, errBadArgsInteractiveWatch
|
||||
}
|
||||
if len(watchArgs) < 1 {
|
||||
if len(watchArgs) < 1 && envKey == "" {
|
||||
return nil, nil, errBadArgsNum
|
||||
}
|
||||
|
||||
@ -192,7 +247,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
if idx < len(osArgs)-1 {
|
||||
osArgs = osArgs[idx+1:]
|
||||
} else {
|
||||
} else if envKey == "" {
|
||||
return nil, nil, errBadArgsNum
|
||||
}
|
||||
|
||||
@ -202,7 +257,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
foundSep := false
|
||||
for idx = range argsWithSep {
|
||||
if argsWithSep[idx] == "--" && idx > 0 {
|
||||
if argsWithSep[idx] == "--" {
|
||||
foundSep = true
|
||||
break
|
||||
}
|
||||
@ -214,6 +269,18 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
|
||||
}
|
||||
watchArgs = flagset.Args()
|
||||
}
|
||||
|
||||
// "watch -- echo hello" with ETCDCTL_WATCH_KEY=foo
|
||||
// should be translated to "watch foo -- echo hello"
|
||||
// (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
|
||||
if envKey != "" {
|
||||
tmp := []string{envKey}
|
||||
if envRange != "" {
|
||||
tmp = append(tmp, envRange)
|
||||
}
|
||||
watchArgs = append(tmp, watchArgs...)
|
||||
}
|
||||
|
||||
if !foundSep {
|
||||
return watchArgs, nil, nil
|
||||
}
|
||||
|
@ -21,9 +21,10 @@ import (
|
||||
|
||||
func Test_parseWatchArgs(t *testing.T) {
|
||||
tt := []struct {
|
||||
osArgs []string // raw arguments to "watch" command
|
||||
commandArgs []string // arguments after "spf13/cobra" preprocessing
|
||||
interactive bool
|
||||
osArgs []string // raw arguments to "watch" command
|
||||
commandArgs []string // arguments after "spf13/cobra" preprocessing
|
||||
envKey, envRange string
|
||||
interactive bool
|
||||
|
||||
watchArgs []string
|
||||
execArgs []string
|
||||
@ -45,9 +46,66 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumSeparator,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
|
||||
commandArgs: []string{"foo", "bar"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
|
||||
commandArgs: []string{"foo", "bar"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
@ -56,6 +114,16 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
|
||||
commandArgs: []string{"foo"},
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1"},
|
||||
commandArgs: nil,
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: nil,
|
||||
@ -117,6 +185,35 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: false,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
commandArgs: []string{"foo", "bar", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
interactive: false,
|
||||
watchArgs: nil,
|
||||
execArgs: nil,
|
||||
err: errBadArgsNumConflictEnv,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"foo", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -141,6 +238,26 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch"},
|
||||
envKey: "hello world!",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"hello world!", "bar"},
|
||||
execArgs: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "--rev", "1"},
|
||||
@ -165,6 +282,25 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -181,6 +317,16 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "--rev", "1", "--", "echo", "Hello", "World"},
|
||||
envKey: "foo",
|
||||
envRange: "bar",
|
||||
interactive: true,
|
||||
watchArgs: []string{"foo", "bar"},
|
||||
execArgs: []string{"echo", "Hello", "World"},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
|
||||
commandArgs: []string{"watch", "foo", "--rev", "1", "bar", "--", "echo", "Hello", "World"},
|
||||
@ -199,7 +345,7 @@ func Test_parseWatchArgs(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, ts := range tt {
|
||||
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.interactive)
|
||||
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.envKey, ts.envRange, ts.interactive)
|
||||
if err != ts.err {
|
||||
t.Fatalf("#%d: error expected %v, got %v", i, ts.err, err)
|
||||
}
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@ -179,10 +178,6 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
||||
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
if cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -392,6 +387,9 @@ func checkSupportArch() {
|
||||
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
|
||||
return
|
||||
}
|
||||
// unsupported arch only configured via environment variable
|
||||
// so unset here to not parse through flag
|
||||
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
|
||||
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
|
||||
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
|
||||
return
|
||||
|
@ -17,6 +17,7 @@ package etcdmain
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -37,10 +38,12 @@ import (
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/proxy/grpcproxy"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -75,6 +78,8 @@ var (
|
||||
|
||||
grpcProxyEnablePprof bool
|
||||
grpcProxyEnableOrdering bool
|
||||
|
||||
grpcProxyDebug bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -127,12 +132,26 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
// experimental flags
|
||||
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
|
||||
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
|
||||
|
||||
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
|
||||
|
||||
return &cmd
|
||||
}
|
||||
|
||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
checkArgs()
|
||||
|
||||
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||
if grpcProxyDebug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
|
||||
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
||||
if tlsinfo == nil && grpcProxyListenAutoTLS {
|
||||
host := []string{"https://" + grpcProxyListenAddr}
|
||||
|
@ -58,7 +58,7 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
}
|
||||
h := hfunc()
|
||||
d, _ := json.Marshal(h)
|
||||
if !h.Health {
|
||||
if h.Health != "true" {
|
||||
http.Error(w, string(d), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
@ -70,33 +70,32 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
// Health defines etcd server health status.
|
||||
// TODO: remove manual parsing in etcdctl cluster-health
|
||||
type Health struct {
|
||||
Health bool `json:"health"`
|
||||
Errors []string `json:"errors,omitempty"`
|
||||
Health string `json:"health"`
|
||||
}
|
||||
|
||||
// TODO: server NOSPACE, etcdserver.ErrNoLeader in health API
|
||||
|
||||
func checkHealth(srv etcdserver.ServerV2) Health {
|
||||
h := Health{Health: false}
|
||||
h := Health{Health: "true"}
|
||||
|
||||
as := srv.Alarms()
|
||||
if len(as) > 0 {
|
||||
for _, v := range as {
|
||||
h.Errors = append(h.Errors, v.Alarm.String())
|
||||
h.Health = "false"
|
||||
}
|
||||
|
||||
if h.Health == "true" {
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
h.Health = "false"
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
h.Errors = append(h.Errors, etcdserver.ErrNoLeader.Error())
|
||||
return h
|
||||
if h.Health == "true" {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
|
||||
cancel()
|
||||
if err != nil {
|
||||
h.Health = "false"
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
|
||||
cancel()
|
||||
if err != nil {
|
||||
h.Errors = append(h.Errors, err.Error())
|
||||
}
|
||||
|
||||
h.Health = err == nil
|
||||
return h
|
||||
}
|
||||
|
@ -16,8 +16,10 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -36,9 +38,8 @@ const (
|
||||
maxSendBytes = math.MaxInt32
|
||||
)
|
||||
|
||||
func init() {
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
}
|
||||
// integration tests call this multiple times, which is racey in gRPC side
|
||||
var grpclogOnce sync.Once
|
||||
|
||||
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
|
||||
var opts []grpc.ServerOption
|
||||
@ -70,5 +71,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
|
||||
// set zero values for metrics registered for this grpc server
|
||||
grpc_prometheus.Register(grpcServer)
|
||||
|
||||
grpclogOnce.Do(func() {
|
||||
if s.Cfg.Debug {
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
})
|
||||
|
||||
return grpcServer
|
||||
}
|
||||
|
@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
resp.TTL = ttl
|
||||
err = stream.Send(resp)
|
||||
if err != nil {
|
||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -81,3 +81,16 @@ func togRPCError(err error) error {
|
||||
}
|
||||
return grpcErr
|
||||
}
|
||||
|
||||
func isClientCtxErr(ctxErr error, err error) bool {
|
||||
if ctxErr != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ev, ok := status.FromError(err)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
code := ev.Code()
|
||||
return code == codes.Canceled || code == codes.DeadlineExceeded
|
||||
}
|
||||
|
@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
// deadlock when calling sws.close().
|
||||
go func() {
|
||||
if rerr := sws.recvLoop(); rerr != nil {
|
||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
if isClientCtxErr(stream.Context().Err(), rerr) {
|
||||
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
}
|
||||
errc <- rerr
|
||||
}
|
||||
}()
|
||||
@ -339,7 +343,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
|
||||
mvcc.ReportEventReceived(len(evs))
|
||||
if err := sws.gRPCStream.Send(wr); err != nil {
|
||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -356,7 +364,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
}
|
||||
|
||||
if err := sws.gRPCStream.Send(c); err != nil {
|
||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -372,7 +384,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
for _, v := range pending[wid] {
|
||||
mvcc.ReportEventReceived(len(v.Events))
|
||||
if err := sws.gRPCStream.Send(v); err != nil {
|
||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -70,6 +70,8 @@ type ServerConfig struct {
|
||||
// before serving any peer/client traffic.
|
||||
InitialCorruptCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||
|
@ -58,10 +58,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
tickDuration = 10 * time.Millisecond
|
||||
clusterName = "etcd"
|
||||
requestTimeout = 20 * time.Second
|
||||
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
|
||||
RequestWaitTimeout = 3 * time.Second
|
||||
tickDuration = 10 * time.Millisecond
|
||||
requestTimeout = 20 * time.Second
|
||||
|
||||
clusterName = "etcd"
|
||||
basePort = 21000
|
||||
UrlScheme = "unix"
|
||||
UrlSchemeTLS = "unixs"
|
||||
|
@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
}
|
||||
|
||||
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
if berr != nil {
|
||||
return berr
|
||||
}
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
|
||||
b.ForEach(func(k, v []byte) error {
|
||||
count++
|
||||
|
@ -33,7 +33,6 @@ type ExpectProcess struct {
|
||||
fpty *os.File
|
||||
wg sync.WaitGroup
|
||||
|
||||
ptyMu sync.Mutex // protects accessing fpty
|
||||
cond *sync.Cond // for broadcasting updates are available
|
||||
mu sync.Mutex // protects lines and err
|
||||
lines []string
|
||||
@ -76,9 +75,7 @@ func (ep *ExpectProcess) read() {
|
||||
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
|
||||
r := bufio.NewReader(ep.fpty)
|
||||
for ep.err == nil {
|
||||
ep.ptyMu.Lock()
|
||||
l, rerr := r.ReadString('\n')
|
||||
ep.ptyMu.Unlock()
|
||||
ep.mu.Lock()
|
||||
ep.err = rerr
|
||||
if l != "" {
|
||||
@ -150,9 +147,7 @@ func (ep *ExpectProcess) close(kill bool) error {
|
||||
}
|
||||
|
||||
err := ep.cmd.Wait()
|
||||
ep.ptyMu.Lock()
|
||||
ep.fpty.Close()
|
||||
ep.ptyMu.Unlock()
|
||||
ep.wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
|
@ -30,13 +30,12 @@ func HandleHealth(mux *http.ServeMux, c *clientv3.Client) {
|
||||
}
|
||||
|
||||
func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
h := etcdhttp.Health{Health: false}
|
||||
h := etcdhttp.Health{Health: "false"}
|
||||
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
|
||||
_, err := c.Get(ctx, "a")
|
||||
cancel()
|
||||
h.Health = err == nil || err == rpctypes.ErrPermissionDenied
|
||||
if !h.Health {
|
||||
h.Errors = append(h.Errors, err.Error())
|
||||
if err == nil || err == rpctypes.ErrPermissionDenied {
|
||||
h.Health = "true"
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
7
test
7
test
@ -133,6 +133,9 @@ function functional_pass {
|
||||
-peer-ports 12380,22380,32380 \
|
||||
-limit 1 \
|
||||
-schedule-cases "0 1 2 3 4 5" \
|
||||
-stress-qps 1000 \
|
||||
-stress-key-txn-count 100 \
|
||||
-stress-key-txn-ops 10 \
|
||||
-exit-on-failure && echo "'etcd-tester' succeeded"
|
||||
ETCD_TESTER_EXIT_CODE=$?
|
||||
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
||||
@ -246,13 +249,13 @@ function grpcproxy_pass {
|
||||
function release_pass {
|
||||
rm -f ./bin/etcd-last-release
|
||||
# to grab latest patch release; bump this up for every minor release
|
||||
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.2.*" | head -1)
|
||||
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
|
||||
if [ -n "$MANUAL_VER" ]; then
|
||||
# in case, we need to test against different version
|
||||
UPGRADE_VER=$MANUAL_VER
|
||||
fi
|
||||
if [[ -z ${UPGRADE_VER} ]]; then
|
||||
UPGRADE_VER="v3.2.0"
|
||||
UPGRADE_VER="v3.3.0"
|
||||
echo "fallback to" ${UPGRADE_VER}
|
||||
fi
|
||||
|
||||
|
@ -34,9 +34,11 @@ import (
|
||||
type keyStresser struct {
|
||||
Endpoint string
|
||||
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
N int
|
||||
|
||||
@ -77,6 +79,14 @@ func (s *keyStresser) Stress() error {
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
if s.keyTxnSuffixRange > 0 {
|
||||
// adjust to make up ±70% of workloads with writes
|
||||
stressEntries[0].weight = 0.35
|
||||
stressEntries = append(stressEntries, stressEntry{
|
||||
weight: 0.35,
|
||||
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
|
||||
})
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
@ -202,6 +212,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
|
||||
keys := make([]string, keyTxnSuffixRange)
|
||||
for i := range keys {
|
||||
keys[i] = fmt.Sprintf("/k%03d", i)
|
||||
}
|
||||
return writeTxn(kvc, keys, txnOps)
|
||||
}
|
||||
|
||||
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
ks := make(map[string]struct{}, txnOps)
|
||||
for len(ks) != txnOps {
|
||||
ks[keys[rand.Intn(len(keys))]] = struct{}{}
|
||||
}
|
||||
selected := make([]string, 0, txnOps)
|
||||
for k := range ks {
|
||||
selected = append(selected, k)
|
||||
}
|
||||
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
|
||||
txnReq := &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
}
|
||||
|
||||
// add nested txns if any
|
||||
for i := 1; i < txnOps; i++ {
|
||||
k, v := selected[i], fmt.Sprintf("bar%02d", i)
|
||||
com, delOp, putOp = getTxnReqs(k, v)
|
||||
nested := &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestTxn{
|
||||
RequestTxn: &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
},
|
||||
},
|
||||
}
|
||||
txnReq.Success = append(txnReq.Success, nested)
|
||||
txnReq.Failure = append(txnReq.Failure, nested)
|
||||
}
|
||||
|
||||
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
|
||||
return err, int64(txnOps)
|
||||
}
|
||||
}
|
||||
|
||||
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
|
||||
// if key exists (version > 0)
|
||||
com = &pb.Compare{
|
||||
Key: []byte(key),
|
||||
Target: pb.Compare_VERSION,
|
||||
Result: pb.Compare_GREATER,
|
||||
TargetUnion: &pb.Compare_Version{Version: 0},
|
||||
}
|
||||
delOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestDeleteRange{
|
||||
RequestDeleteRange: &pb.DeleteRangeRequest{
|
||||
Key: []byte(key),
|
||||
},
|
||||
},
|
||||
}
|
||||
putOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestPut{
|
||||
RequestPut: &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte(val),
|
||||
},
|
||||
},
|
||||
}
|
||||
return com, delOp, putOp
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
|
@ -47,6 +47,8 @@ func main() {
|
||||
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
|
||||
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
|
||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
|
||||
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
|
||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
|
||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||
@ -120,15 +122,23 @@ func main() {
|
||||
}
|
||||
|
||||
scfg := stressConfig{
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
|
||||
keyTxnOps: int(*stressKeyTxnOps),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
|
||||
etcdRunnerPath: *etcdRunnerPath,
|
||||
}
|
||||
if scfg.keyTxnSuffixRange > 100 {
|
||||
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
|
||||
}
|
||||
if scfg.keyTxnOps > 64 {
|
||||
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
|
||||
}
|
||||
|
||||
t := &tester{
|
||||
failures: schedule,
|
||||
|
@ -16,17 +16,13 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
|
||||
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
Stress() error
|
||||
@ -117,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
|
||||
}
|
||||
|
||||
type stressConfig struct {
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
@ -146,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
|
||||
// TODO: Too intensive stressers can panic etcd member with
|
||||
// 'out of memory' error. Put rate limits in server side.
|
||||
return &keyStresser{
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
keyTxnSuffixRange: sc.keyTxnSuffixRange,
|
||||
keyTxnOps: sc.keyTxnOps,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "v2keys":
|
||||
return &v2Stresser{
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.3.0"
|
||||
Version = "3.3.0-rc.3"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user