Compare commits

...

44 Commits

Author SHA1 Message Date
374dc5743f version: 3.3.0-rc.3
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 15:23:08 -08:00
55505617df proxy/grpcproxy: remove "Errors" field
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 15:22:54 -08:00
a9317d3d77 e2e: remove "/health" "errors" field test
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 15:22:54 -08:00
02d362ccde etcdserver/api/etcdhttp: remove "errors" field in /health
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 15:22:54 -08:00
d292337d14 api/etcdhttp: change /health type back to string for backwards compatibility 2018-01-17 12:44:38 -08:00
7974f008f3 etcdctl: document "ETCD_WATCH_*"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:44:22 -08:00
4a3f99415e e2e: test ETCD_WATCH_VALUE
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:44:07 -08:00
6340564c84 ctlv3: set ETCD_WATCH_* on watch exec
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:55 -08:00
6735028ec0 ctlv3: exit on exec watch error
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:45 -08:00
906f098053 ctlv3: set ETCD_WATCH_KEY, ETCD_WATCH_VALUE on exec watch
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:38 -08:00
8a66237693 ctlv3: handle pkg/flags warnings
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:27 -08:00
d37afffb98 etcdctl: document watch with ETCDCTL_WATCH_*
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:12 -08:00
7e2759da8d e2e: add watch tests with ETCDCTL_WATCH_*
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:43:02 -08:00
ad4df985fc ctlv3: support ETCDCTL_WATCH_KEY, ETCDCTL_WATCH_RANGE_END
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-17 12:42:54 -08:00
2df89c8bf6 Documentation/op-guide: clarify security.md on TLS auth
Make it more accurate (just as pkg/transport/listener_tls.go does).

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-12 15:23:06 -08:00
6178c45066 etcdctl: don't ask password twice for etcdctl endpoint health --cluster
Current etcdctl endpoint health --cluster asks password twice if auth
is enabled. This is because the command creates two client instances:
one for the purpose of checking endpoint health and another for
getting cluster members with MemberList(). The latter client doesn't
need to be authenticated because MemberList() is a public RPC. This
commit makes the client with no authed one.

Fix https://github.com/coreos/etcd/issues/9094
2018-01-12 09:59:31 -08:00
9ccae0f81a etcd-tester: update stresser weights with txn stresser
Large key writes (stressEntries[1].weight) should not take this
much weight. It was triggering "database size exceeded" errors.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-12 09:41:51 -08:00
a5079cc381 version: 3.3.0-rc.2+git
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-11 14:16:08 -08:00
9e079d8f02 version: 3.3.0-rc.2
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-11 11:18:46 -08:00
bd57c9ca5b etcd-tester: fix "writeTxn" key selection
Found when debugging https://github.com/coreos/etcd/issues/9130.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-11 11:18:05 -08:00
58c402a47b test: limit stress-qps for slow CI machines, add txn flags
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2018-01-09 14:18:45 -08:00
3ce73b70bc etcd-tester: add txn stresser
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2018-01-09 14:18:33 -08:00
ee3c81d8d3 ctlv3: add "snapshot restore --wal-dir"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-09 11:12:29 -08:00
2dfabfbef6 DocCommand: use regex wildcard
The current command as such produces no output on mac term or bash shell.
Using regex wildcard works fine on mac and linux.
2018-01-09 09:11:16 -08:00
bf83d5269f clientv3/integration: fix typos
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-09 09:11:15 -08:00
a609b1eb47 integration: add constant RequestWaitTimeout. 2018-01-09 09:11:15 -08:00
1ae0c0b47d mvcc: check null before set FillPercent not to panic
Since CreateBucketIfNotExists() can return nil when it gets an error,
accessing FillPercent must be done after a nil check, not to cause
a panic.
2018-01-08 13:08:03 -08:00
ec43197344 etcdserver/api/v3rpc: debug user cancellation and log warning for rest
The context error with cancel code is typically for user cancellation which
should be at debug level. For other error codes we should display a warning.

Fixes #9085
2018-01-08 10:14:37 -08:00
70ba0518f1 embed: enable extensive metrics if specified 2018-01-07 18:48:59 -08:00
e330f5004f etcdmain: unset ETCD_UNSUPPORTED_ARCH after arch check
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-05 03:38:35 +00:00
0ec5023b7b pkg/expect: fix deadlock in mac OS
bufio.NewReader.ReadString blocks even
when the process received syscall.SIGKILL.
Remove ptyMu mutex and make ReadString return
when *os.File is closed.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 14:34:01 -08:00
0f69520622 version: bump up to 3.3.0-rc.1+git
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 14:33:10 -08:00
d3c2acf090 version: bump up to 3.3.0-rc.1
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:27:15 -08:00
5e35f79087 clientv3/integration: fix TestKVLargeRequests with -tags cluster_proxy
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:07:24 -08:00
6dff1a9398 tools/functional-tester: remove duplicate grpclog set
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
325913d6fb etcdserver/api/v3rpc: set grpclog once
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
24c9fb0527 etcdserver,embed: discard gRPC info logs when debug is off
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
8511db5e2b etcdserver/api/v3rpc: log stream error with debug level
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
3193f3c9ab clientv3/leasing: fix racey waitSession
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 17:51:03 -08:00
bdc508cadf grpc-proxy: add "--debug" flag to "etcd grpc-proxy start" command
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 14:44:10 -08:00
d5a0609412 embed: only discard infos when debug flag is off
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 14:44:02 -08:00
67af1a2138 CHANGELOG: remove rc in release-3.3
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:32:15 -08:00
66d68a8fdb *: update release upgrade test versions
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:16:59 -08:00
ebaa83c985 version: bump up to 3.3.0+git
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:16:49 -08:00
43 changed files with 704 additions and 169 deletions

View File

@ -2,7 +2,7 @@
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.2.11"
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0-rc.0"
if [ "$TEST_ARCH" == "386" ]; then
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
fi

View File

@ -1,8 +1,4 @@
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??)
**v3.3.0 is not yet released; expected to be released in January 2018.**
## [v3.3.0-rc.0](https://github.com/coreos/etcd/releases/tag/v3.3.0-rc.0) (2017-12-20)
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0)
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.

View File

@ -87,7 +87,7 @@ Removing excessive keyspace data and defragmenting the backend database will put
```sh
# get current revision
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9]*')
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9].*')
# compact away all old revisions
$ ETCDCTL_API=3 etcdctl compact $rev
compacted revision 1516

View File

@ -197,7 +197,7 @@ When client authentication is enabled for an etcd member, the administrator must
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [TLS certificates get reloaded on every client connection](https://github.com/coreos/etcd/pull/7829). This is useful when replacing expiry certs without stopping etcd servers; it can be done by overwriting old certs with new ones. Refreshing certs for every connection should not have too much overhead, but can be improved in the future, with caching layer. Example tests can be found [here](https://github.com/coreos/etcd/blob/b041ce5d514a4b4aaeefbffb008f0c7570a18986/integration/v3_grpc_test.go#L1601-L1757).
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server denies incoming peer certs with wrong IP `SAN`](https://github.com/coreos/etcd/pull/7687). For instance, if peer cert contains any IP addresses in Subject Alternative Name (SAN) field, server authenticates a peer only when the remote IP address matches one of those IP addresses. This is to prevent unauthorized endpoints from joining the cluster. For example, peer B's CSR (with `cfssl`) is:
```json
{
@ -223,16 +223,14 @@ Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017
when peer B's actual IP address is `10.138.0.2`, not `10.138.0.27`. When peer B tries to join the cluster, peer A will reject B with the error `x509: certificate is valid for 10.138.0.27, not 10.138.0.2`, because B's remote IP address does not match the one in Subject Alternative Name (SAN) field.
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains any DNS names in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
Since [v3.2.0](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v320-2017-06-09), [server resolves TLS `DNSNames` when checking `SAN`](https://github.com/coreos/etcd/pull/7767). For instance, if peer cert contains only DNS names (no IP addresses) in Subject Alternative Name (SAN) field, server authenticates a peer only when forward-lookups (`dig b.com`) on those DNS names have matching IP with the remote IP address. For example, peer B's CSR (with `cfssl`) is:
```json
{
...
"CN": "etcd peer",
"hosts": [
"b.com"
],
...
}
```
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A looks up the incoming host `b.com` to get the list of IP addresses (e.g. `dig b.com`). And rejects B if the list does not contain the IP `10.138.0.2`, with the error `tls: 10.138.0.2 does not match any of DNSNames ["b.com"]`.
@ -241,13 +239,11 @@ Since [v3.2.2](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v322-2017
```json
{
...
"CN": "etcd peer",
"hosts": [
"invalid.domain",
"10.138.0.2"
],
...
}
```
when peer B's remote IP address is `10.138.0.2` and `invalid.domain` is a invalid host. When peer B tries to join the cluster, peer A successfully authenticates B, since Subject Alternative Name (SAN) field has a valid matching IP address. See [issue#8206](https://github.com/coreos/etcd/issues/8206) for more detail.
@ -256,13 +252,11 @@ Since [v3.2.5](https://github.com/coreos/etcd/blob/master/CHANGELOG.md#v325-2017
```json
{
...
"CN": "etcd peer",
"hosts": [
"*.example.default.svc",
"*.example.default.svc.cluster.local"
],
...
}
```
when peer B's remote IP address is `10.138.0.2`. When peer B tries to join the cluster, peer A reverse-lookup the IP `10.138.0.2` to get the list of host names. And either exact or wildcard match the host names with peer B's cert DNS names in Subject Alternative Name (SAN) field. If none of reverse/forward lookups worked, it returns an error `"tls: "10.138.0.2" does not match any of DNSNames ["*.example.default.svc","*.example.default.svc.cluster.local"]`. See [issue#8268](https://github.com/coreos/etcd/issues/8268) for more detail.

View File

@ -74,23 +74,23 @@ Set `embed.Config.Debug` field to `true` to enable gRPC server logs.
#### Change in `/health` endpoint response value
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 instead defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct and returns properly encoded JSON value with errors, if any.
Previously, `[endpoint]:[client-port]/health` returned manually marshaled JSON value. 3.3 now defines [`etcdhttp.Health`](https://godoc.org/github.com/coreos/etcd/etcdserver/api/etcdhttp#Health) struct and includes errors, if any.
Before
```bash
$ curl http://localhost:2379/health
{"health": "true"}
{"health":"true"}
```
After
```bash
$ curl http://localhost:2379/health
{"health":true}
{"health":"true"}
# Or
{"health":false,"errors":["NOSPACE"]}
{"health":"false","errors":["NOSPACE"]}
```
#### Change in gRPC gateway HTTP endpoints (replaced `/v3alpha` with `/v3beta`)

View File

@ -45,12 +45,12 @@ It is important to monitor your production etcd cluster for healthy information
#### Health Monitoring
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":true}`, then the cluster is healthy.
At lowest level, etcd exposes health information via HTTP at `/health` in JSON format. If it returns `{"health":"true"}`, then the cluster is healthy.
```
$ curl -L http://127.0.0.1:2379/health
{"health":true}
{"health":"true"}
```
You can also use etcdctl to check the cluster-wide health information. It will contact all the members of the cluster and collect the health information for you.

View File

@ -29,5 +29,5 @@ curl http://10.0.0.10:2379/health
```
```json
{"health":true}
{"health":"true"}
```

View File

@ -54,7 +54,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
// dial for unhealthy endpoint.
// then we can reduce 3s to 1s.
timeout := pingInterval + 3*time.Second
timeout := pingInterval + integration.RequestWaitTimeout
cli, err := clientv3.New(ccfg)
if err != nil {

View File

@ -121,7 +121,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
if !setBefore {
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}

View File

@ -453,7 +453,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
clus.TakeClient(0)
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
case <-donec:
}
@ -480,7 +480,7 @@ func TestKVNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
case <-donec:
}
@ -906,7 +906,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
@ -920,7 +920,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
},
}
for i, test := range tests {
@ -939,7 +939,7 @@ func TestKVLargeRequests(t *testing.T) {
if err != test.expectError {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
} else if err != nil && err.Error() != test.expectError.Error() {
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}

View File

@ -299,7 +299,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
@ -325,7 +325,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
@ -357,7 +357,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Revoke took too long")
case <-donec:
}

View File

@ -234,7 +234,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
select {
case <-wch:
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("took too long to create watch")
}
@ -252,7 +252,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
if err = ev.Err(); err != rpctypes.ErrNoLeader {
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
}
case <-time.After(3 * time.Second): // enough time to detect leader lost
case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
t.Fatal("took too long to detect leader lost")
}
}

View File

@ -63,7 +63,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
select {
case <-wch:
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("took too long to create watch")
}
@ -348,7 +348,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
clus.Members[target].Restart(t)
select {
case <-time.After(clientTimeout + 3*time.Second):
case <-time.After(clientTimeout + integration.RequestWaitTimeout):
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
case <-donec:
}

View File

@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) {
clus.TakeClient(0)
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("wc.Watch took too long")
case <-donec:
}
@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("wc.Watch took too long")
case <-donec:
}
@ -751,7 +751,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
if resp.Err() != rpctypes.ErrNoLeader {
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
}
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("watch without leader took too long to close")
}
@ -760,7 +760,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
if ok {
t.Fatalf("expected closed channel, got response %v", resp)
}
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("waited too long for channel to close")
}

View File

@ -445,8 +445,11 @@ func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue
}
func (lkv *leasingKV) waitSession(ctx context.Context) error {
lkv.leases.mu.RLock()
sessionc := lkv.sessionc
lkv.leases.mu.RUnlock()
select {
case <-lkv.sessionc:
case <-sessionc:
return nil
case <-lkv.ctx.Done():
return lkv.ctx.Err()

View File

@ -53,7 +53,7 @@ func alarmTest(cx ctlCtx) {
}
// '/health' handler should return 'false'
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":false,"errors":["NOSPACE"]}`}); err != nil {
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"false"}`}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"os"
"strings"
"testing"
)
@ -45,55 +46,94 @@ type kvExec struct {
func watchTest(cx ctlCtx) {
tests := []struct {
puts []kv
args []string
puts []kv
envKey string
envRange string
args []string
wkv []kvExec
}{
{ // watch 1 key
[]kv{{"sample", "value"}},
[]string{"sample", "--rev", "1"},
[]kvExec{{key: "sample", val: "value"}},
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
args: []string{"--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with "echo watch event received"
[]kv{{"sample", "value"}},
[]string{"sample", "--rev", "1", "--", "echo", "watch event received"},
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 1 key with ${ETCD_WATCH_VALUE}
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1", "--", "env"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: `ETCD_WATCH_VALUE="value"`}},
},
{ // watch 1 key with "echo watch event received", with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
args: []string{"--rev", "1", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 1 key with "echo watch event received"
[]kv{{"sample", "value"}},
[]string{"--rev", "1", "sample", "--", "echo", "watch event received"},
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
puts: []kv{{"sample", "value"}},
args: []string{"--rev", "1", "sample", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 1 key with "echo \"Hello World!\""
[]kv{{"sample", "value"}},
[]string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
[]kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
puts: []kv{{"sample", "value"}},
args: []string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}},
},
{ // watch 1 key with "echo watch event received"
[]kv{{"sample", "value"}},
[]string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
puts: []kv{{"sample", "value"}},
args: []string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 1 key with "echo watch event received"
[]kv{{"sample", "value"}},
[]string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
[]kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
puts: []kv{{"sample", "value"}},
envKey: "sample",
envRange: "samplx",
args: []string{"--rev", "1", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 1 key with "echo watch event received"
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 3 keys by prefix
[]kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
[]string{"key", "--rev", "1", "--prefix"},
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
[]kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
[]string{"etcd", "--rev", "2"},
[]kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
[]kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
[]string{"key", "key3", "--rev", "1"},
[]kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",
envRange: "key3",
args: []string{"--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
}
@ -107,11 +147,30 @@ func watchTest(cx ctlCtx) {
}
close(donec)
}(i, tt.puts)
unsetEnv := func() {}
if tt.envKey != "" || tt.envRange != "" {
if tt.envKey != "" {
os.Setenv("ETCDCTL_WATCH_KEY", tt.envKey)
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_KEY") }
}
if tt.envRange != "" {
os.Setenv("ETCDCTL_WATCH_RANGE_END", tt.envRange)
unsetEnv = func() { os.Unsetenv("ETCDCTL_WATCH_RANGE_END") }
}
if tt.envKey != "" && tt.envRange != "" {
unsetEnv = func() {
os.Unsetenv("ETCDCTL_WATCH_KEY")
os.Unsetenv("ETCDCTL_WATCH_RANGE_END")
}
}
}
if err := ctlV3Watch(cx, tt.args, tt.wkv...); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
cx.t.Errorf("watchTest #%d: ctlV3Watch error (%v)", i, err)
}
}
unsetEnv()
<-donec
}
}

View File

@ -45,7 +45,7 @@ func metricsTest(cx ctlCtx) {
if err := cURLGet(cx.epc, cURLReq{endpoint: "/metrics", expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version), metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":true}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
if err := cURLGet(cx.epc, cURLReq{endpoint: "/health", expected: `{"health":"true"}`, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
}

View File

@ -268,8 +268,11 @@ func (cfg *Config) SetupLogging() {
if cfg.Debug {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
if cfg.LogPkgLevels != "" {
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")

View File

@ -41,6 +41,7 @@ import (
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@ -179,6 +180,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
AuthToken: cfg.AuthToken,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
Debug: cfg.Debug,
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
@ -522,6 +524,10 @@ func (e *Etcd) serveClients() (err error) {
}
func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)

View File

@ -378,6 +378,13 @@ watch [options] <key or prefix>\n
# bar
```
```bash
ETCDCTL_WATCH_KEY=foo ./etcdctl watch
# PUT
# foo
# bar
```
Receive events and execute `echo watch event received`:
```bash
@ -388,6 +395,41 @@ Receive events and execute `echo watch event received`:
# watch event received
```
Watch response is set via `ETCD_WATCH_*` environmental variables:
```bash
./etcdctl watch foo -- sh -c "env | grep ETCD_WATCH_"
# PUT
# foo
# bar
# ETCD_WATCH_REVISION=11
# ETCD_WATCH_KEY="foo"
# ETCD_WATCH_EVENT_TYPE="PUT"
# ETCD_WATCH_VALUE="bar"
```
Watch with environmental variables and execute `echo watch event received`:
```bash
export ETCDCTL_WATCH_KEY=foo
./etcdctl watch -- echo watch event received
# PUT
# foo
# bar
# watch event received
```
```bash
export ETCDCTL_WATCH_KEY=foo
export ETCDCTL_WATCH_RANGE_END=foox
./etcdctl watch -- echo watch event received
# PUT
# fob
# bar
# watch event received
```
##### Interactive
```bash
@ -413,6 +455,29 @@ watch foo -- echo watch event received
# watch event received
```
Watch with environmental variables and execute `echo watch event received`:
```bash
export ETCDCTL_WATCH_KEY=foo
./etcdctl watch -i
watch -- echo watch event received
# PUT
# foo
# bar
# watch event received
```
```bash
export ETCDCTL_WATCH_KEY=foo
export ETCDCTL_WATCH_RANGE_END=foox
./etcdctl watch -i
watch -- echo watch event received
# PUT
# fob
# bar
# watch event received
```
### LEASE \<subcommand\>
LEASE provides commands for key lease management.
@ -874,6 +939,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
- wal-dir -- Path to the WAL directory. Uses data directory if none given.
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.

View File

@ -202,7 +202,26 @@ func endpointsFromCluster(cmd *cobra.Command) []string {
}
return endpoints
}
c := mustClientFromCmd(cmd)
sec := secureCfgFromCmd(cmd)
dt := dialTimeoutFromCmd(cmd)
ka := keepAliveTimeFromCmd(cmd)
kat := keepAliveTimeoutFromCmd(cmd)
eps, err := endpointsFromCmd(cmd)
if err != nil {
ExitWithError(ExitError, err)
}
// exclude auth for not asking needless password (MemberList() doesn't need authentication)
cfg, err := newClientCfg(eps, dt, ka, kat, sec, nil)
if err != nil {
ExitWithError(ExitError, err)
}
c, err := v3.New(*cfg)
if err != nil {
ExitWithError(ExitError, err)
}
ctx, cancel := commandCtx(cmd)
defer func() {
c.Close()

View File

@ -101,8 +101,19 @@ type clientConfig struct {
acfg *authCfg
}
type discardValue struct{}
func (*discardValue) String() string { return "" }
func (*discardValue) Set(string) error { return nil }
func (*discardValue) Type() string { return "" }
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
fs := cmd.InheritedFlags()
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_KEY=foo" warnings
// silence "pkg/flags: unrecognized environment variable ETCDCTL_WATCH_RANGE_END=bar" warnings
fs.AddFlag(&pflag.Flag{Name: "watch-key", Value: &discardValue{}})
fs.AddFlag(&pflag.Flag{Name: "watch-range-end", Value: &discardValue{}})
flags.SetPflagsFromEnv("ETCDCTL", fs)
debug, err := cmd.Flags().GetBool("debug")

View File

@ -56,6 +56,7 @@ var (
restoreCluster string
restoreClusterToken string
restoreDataDir string
restoreWalDir string
restorePeerURLs string
restoreName string
skipHashCheck bool
@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
Run: snapshotRestoreCommandFunc,
}
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
basedir = restoreName + ".etcd"
}
waldir := filepath.Join(basedir, "member", "wal")
waldir := restoreWalDir
if waldir == "" {
waldir = filepath.Join(basedir, "member", "wal")
}
snapdir := filepath.Join(basedir, "member", "snap")
if _, err := os.Stat(basedir); err == nil {

View File

@ -30,6 +30,7 @@ import (
var (
errBadArgsNum = errors.New("bad number of arguments")
errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
)
@ -59,12 +60,17 @@ func NewWatchCommand() *cobra.Command {
// watchCommandFunc executes the "watch" command.
func watchCommandFunc(cmd *cobra.Command, args []string) {
envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
if envKey == "" && envRange != "" {
ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
}
if watchInteractive {
watchInteractiveFunc(cmd, os.Args)
watchInteractiveFunc(cmd, os.Args, envKey, envRange)
return
}
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, false)
watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
if err != nil {
ExitWithError(ExitBadArgs, err)
}
@ -82,7 +88,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
}
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
c := mustClientFromCmd(cmd)
reader := bufio.NewReader(os.Stdin)
@ -95,7 +101,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
l = strings.TrimSuffix(l, "\n")
args := argify(l)
if len(args) < 2 {
if len(args) < 2 && envKey == "" {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
@ -105,7 +111,7 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
continue
}
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, true)
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
if perr != nil {
ExitWithError(ExitBadArgs, perr)
}
@ -149,11 +155,18 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
display.Watch(resp)
if len(execArgs) > 0 {
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
cmd.Env = os.Environ()
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
for _, ev := range resp.Events {
cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
os.Exit(1)
}
}
}
}
@ -165,7 +178,7 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
// (e.g. ./bin/etcdctl watch foo --rev 1 bar).
// "--" characters are invalid arguments for "spf13/cobra" library,
// so no need to handle such cases.
func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs []string, execArgs []string, err error) {
func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
watchArgs = commandArgs
// remove preceding commands (e.g. "watch foo bar" in interactive mode)
@ -175,12 +188,54 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
break
}
}
if idx < len(watchArgs)-1 {
watchArgs = watchArgs[idx+1:]
if idx < len(watchArgs)-1 || envKey != "" {
if idx < len(watchArgs)-1 {
watchArgs = watchArgs[idx+1:]
}
execIdx, execExist := 0, false
for execIdx = range osArgs {
v := osArgs[execIdx]
if v == "--" && execIdx != len(osArgs)-1 {
execExist = true
break
}
}
if idx == len(watchArgs)-1 && envKey != "" {
if len(watchArgs) > 0 && !interactive {
// "watch --rev 1 -- echo Hello World" has no conflict
if !execExist {
// "watch foo" with ETCDCTL_WATCH_KEY=foo
// (watchArgs==["foo"])
return nil, nil, errBadArgsNumConflictEnv
}
}
// otherwise, watch with no argument and environment key is set
// if interactive, first "watch" command string should be removed
if interactive {
watchArgs = []string{}
}
}
// "watch foo -- echo hello" with ETCDCTL_WATCH_KEY=foo
// (watchArgs==["foo","echo","hello"])
if envKey != "" && execExist {
widx, oidx := 0, len(osArgs)-1
for widx = len(watchArgs) - 1; widx >= 0; widx-- {
if watchArgs[widx] == osArgs[oidx] {
oidx--
continue
}
if oidx == execIdx { // watchArgs has extra
return nil, nil, errBadArgsNumConflictEnv
}
}
}
} else if interactive { // "watch" not found
return nil, nil, errBadArgsInteractiveWatch
}
if len(watchArgs) < 1 {
if len(watchArgs) < 1 && envKey == "" {
return nil, nil, errBadArgsNum
}
@ -192,7 +247,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
}
if idx < len(osArgs)-1 {
osArgs = osArgs[idx+1:]
} else {
} else if envKey == "" {
return nil, nil, errBadArgsNum
}
@ -202,7 +257,7 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
}
foundSep := false
for idx = range argsWithSep {
if argsWithSep[idx] == "--" && idx > 0 {
if argsWithSep[idx] == "--" {
foundSep = true
break
}
@ -214,6 +269,18 @@ func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs [
}
watchArgs = flagset.Args()
}
// "watch -- echo hello" with ETCDCTL_WATCH_KEY=foo
// should be translated to "watch foo -- echo hello"
// (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
if envKey != "" {
tmp := []string{envKey}
if envRange != "" {
tmp = append(tmp, envRange)
}
watchArgs = append(tmp, watchArgs...)
}
if !foundSep {
return watchArgs, nil, nil
}

View File

@ -21,9 +21,10 @@ import (
func Test_parseWatchArgs(t *testing.T) {
tt := []struct {
osArgs []string // raw arguments to "watch" command
commandArgs []string // arguments after "spf13/cobra" preprocessing
interactive bool
osArgs []string // raw arguments to "watch" command
commandArgs []string // arguments after "spf13/cobra" preprocessing
envKey, envRange string
interactive bool
watchArgs []string
execArgs []string
@ -45,9 +46,66 @@ func Test_parseWatchArgs(t *testing.T) {
execArgs: nil,
err: errBadArgsNumSeparator,
},
{
osArgs: []string{"./bin/etcdctl", "watch"},
commandArgs: nil,
envKey: "foo",
envRange: "bar",
interactive: false,
watchArgs: []string{"foo", "bar"},
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
commandArgs: []string{"foo"},
envKey: "foo",
envRange: "",
interactive: false,
watchArgs: nil,
execArgs: nil,
err: errBadArgsNumConflictEnv,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
commandArgs: []string{"foo", "bar"},
envKey: "foo",
envRange: "",
interactive: false,
watchArgs: nil,
execArgs: nil,
err: errBadArgsNumConflictEnv,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"},
commandArgs: []string{"foo", "bar"},
envKey: "foo",
envRange: "bar",
interactive: false,
watchArgs: nil,
execArgs: nil,
err: errBadArgsNumConflictEnv,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "foo"},
commandArgs: []string{"foo"},
interactive: false,
watchArgs: []string{"foo"},
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch"},
commandArgs: nil,
envKey: "foo",
interactive: false,
watchArgs: []string{"foo"},
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
commandArgs: []string{"foo"},
interactive: false,
watchArgs: []string{"foo"},
execArgs: nil,
@ -56,6 +114,16 @@ func Test_parseWatchArgs(t *testing.T) {
{
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"},
commandArgs: []string{"foo"},
envKey: "foo",
interactive: false,
watchArgs: nil,
execArgs: nil,
err: errBadArgsNumConflictEnv,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1"},
commandArgs: nil,
envKey: "foo",
interactive: false,
watchArgs: []string{"foo"},
execArgs: nil,
@ -117,6 +185,35 @@ func Test_parseWatchArgs(t *testing.T) {
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
commandArgs: []string{"echo", "Hello", "World"},
envKey: "foo",
envRange: "",
interactive: false,
watchArgs: []string{"foo"},
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "--", "echo", "Hello", "World"},
commandArgs: []string{"echo", "Hello", "World"},
envKey: "foo",
envRange: "bar",
interactive: false,
watchArgs: []string{"foo", "bar"},
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--rev", "1", "--", "echo", "Hello", "World"},
commandArgs: []string{"foo", "bar", "echo", "Hello", "World"},
envKey: "foo",
interactive: false,
watchArgs: nil,
execArgs: nil,
err: errBadArgsNumConflictEnv,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"foo", "bar", "--", "echo", "Hello", "World"},
@ -141,6 +238,26 @@ func Test_parseWatchArgs(t *testing.T) {
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch"},
envKey: "foo",
envRange: "bar",
interactive: true,
watchArgs: []string{"foo", "bar"},
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch"},
envKey: "hello world!",
envRange: "bar",
interactive: true,
watchArgs: []string{"hello world!", "bar"},
execArgs: nil,
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "foo", "--rev", "1"},
@ -165,6 +282,25 @@ func Test_parseWatchArgs(t *testing.T) {
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
envKey: "foo",
interactive: true,
watchArgs: []string{"foo"},
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "--", "echo", "Hello", "World"},
envKey: "foo",
envRange: "bar",
interactive: true,
watchArgs: []string{"foo", "bar"},
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "foo", "bar", "--", "echo", "Hello", "World"},
@ -181,6 +317,16 @@ func Test_parseWatchArgs(t *testing.T) {
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "--rev", "1", "--", "echo", "Hello", "World"},
envKey: "foo",
envRange: "bar",
interactive: true,
watchArgs: []string{"foo", "bar"},
execArgs: []string{"echo", "Hello", "World"},
err: nil,
},
{
osArgs: []string{"./bin/etcdctl", "watch", "-i"},
commandArgs: []string{"watch", "foo", "--rev", "1", "bar", "--", "echo", "Hello", "World"},
@ -199,7 +345,7 @@ func Test_parseWatchArgs(t *testing.T) {
},
}
for i, ts := range tt {
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.interactive)
watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.envKey, ts.envRange, ts.interactive)
if err != ts.err {
t.Fatalf("#%d: error expected %v, got %v", i, ts.err, err)
}

View File

@ -40,7 +40,6 @@ import (
"github.com/coreos/etcd/version"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
)
@ -179,10 +178,6 @@ func startEtcdOrProxyV2() {
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
@ -392,6 +387,9 @@ func checkSupportArch() {
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
return
}
// unsupported arch only configured via environment variable
// so unset here to not parse through flag
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
return

View File

@ -17,6 +17,7 @@ package etcdmain
import (
"context"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
@ -37,10 +38,12 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy"
"github.com/coreos/pkg/capnslog"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
var (
@ -75,6 +78,8 @@ var (
grpcProxyEnablePprof bool
grpcProxyEnableOrdering bool
grpcProxyDebug bool
)
func init() {
@ -127,12 +132,26 @@ func newGRPCProxyStartCommand() *cobra.Command {
// experimental flags
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
return &cmd
}
func startGRPCProxy(cmd *cobra.Command, args []string) {
checkArgs()
capnslog.SetGlobalLogLevel(capnslog.INFO)
if grpcProxyDebug {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
if tlsinfo == nil && grpcProxyListenAutoTLS {
host := []string{"https://" + grpcProxyListenAddr}

View File

@ -58,7 +58,7 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
}
h := hfunc()
d, _ := json.Marshal(h)
if !h.Health {
if h.Health != "true" {
http.Error(w, string(d), http.StatusServiceUnavailable)
return
}
@ -70,33 +70,32 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
// Health defines etcd server health status.
// TODO: remove manual parsing in etcdctl cluster-health
type Health struct {
Health bool `json:"health"`
Errors []string `json:"errors,omitempty"`
Health string `json:"health"`
}
// TODO: server NOSPACE, etcdserver.ErrNoLeader in health API
func checkHealth(srv etcdserver.ServerV2) Health {
h := Health{Health: false}
h := Health{Health: "true"}
as := srv.Alarms()
if len(as) > 0 {
for _, v := range as {
h.Errors = append(h.Errors, v.Alarm.String())
h.Health = "false"
}
if h.Health == "true" {
if uint64(srv.Leader()) == raft.None {
h.Health = "false"
}
return h
}
if uint64(srv.Leader()) == raft.None {
h.Errors = append(h.Errors, etcdserver.ErrNoLeader.Error())
return h
if h.Health == "true" {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
cancel()
if err != nil {
h.Health = "false"
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
cancel()
if err != nil {
h.Errors = append(h.Errors, err.Error())
}
h.Health = err == nil
return h
}

View File

@ -16,8 +16,10 @@ package v3rpc
import (
"crypto/tls"
"io/ioutil"
"math"
"os"
"sync"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -36,9 +38,8 @@ const (
maxSendBytes = math.MaxInt32
)
func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
}
// integration tests call this multiple times, which is racey in gRPC side
var grpclogOnce sync.Once
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
@ -70,5 +71,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
grpclogOnce.Do(func() {
if s.Cfg.Debug {
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
return grpcServer
}

View File

@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
return nil
}
if err != nil {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
}
return err
}
@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
}
return err
}
}

View File

@ -81,3 +81,16 @@ func togRPCError(err error) error {
}
return grpcErr
}
func isClientCtxErr(ctxErr error, err error) bool {
if ctxErr != nil {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Canceled || code == codes.DeadlineExceeded
}

View File

@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
// deadlock when calling sws.close().
go func() {
if rerr := sws.recvLoop(); rerr != nil {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
if isClientCtxErr(stream.Context().Err(), rerr) {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
} else {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
}
errc <- rerr
}
}()
@ -339,7 +343,11 @@ func (sws *serverWatchStream) sendLoop() {
mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
}
return
}
@ -356,7 +364,11 @@ func (sws *serverWatchStream) sendLoop() {
}
if err := sws.gRPCStream.Send(c); err != nil {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
}
return
}
@ -372,7 +384,11 @@ func (sws *serverWatchStream) sendLoop() {
for _, v := range pending[wid] {
mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
}
return
}
}

View File

@ -70,6 +70,8 @@ type ServerConfig struct {
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
Debug bool
}
// VerifyBootstrap sanity-checks the initial config for bootstrap case

View File

@ -58,10 +58,12 @@ import (
)
const (
tickDuration = 10 * time.Millisecond
clusterName = "etcd"
requestTimeout = 20 * time.Second
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
RequestWaitTimeout = 3 * time.Second
tickDuration = 10 * time.Millisecond
requestTimeout = 20 * time.Second
clusterName = "etcd"
basePort = 21000
UrlScheme = "unix"
UrlSchemeTLS = "unixs"

View File

@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for seq write in for each
b.ForEach(func(k, v []byte) error {
count++

View File

@ -33,7 +33,6 @@ type ExpectProcess struct {
fpty *os.File
wg sync.WaitGroup
ptyMu sync.Mutex // protects accessing fpty
cond *sync.Cond // for broadcasting updates are available
mu sync.Mutex // protects lines and err
lines []string
@ -76,9 +75,7 @@ func (ep *ExpectProcess) read() {
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
r := bufio.NewReader(ep.fpty)
for ep.err == nil {
ep.ptyMu.Lock()
l, rerr := r.ReadString('\n')
ep.ptyMu.Unlock()
ep.mu.Lock()
ep.err = rerr
if l != "" {
@ -150,9 +147,7 @@ func (ep *ExpectProcess) close(kill bool) error {
}
err := ep.cmd.Wait()
ep.ptyMu.Lock()
ep.fpty.Close()
ep.ptyMu.Unlock()
ep.wg.Wait()
if err != nil {

View File

@ -30,13 +30,12 @@ func HandleHealth(mux *http.ServeMux, c *clientv3.Client) {
}
func checkHealth(c *clientv3.Client) etcdhttp.Health {
h := etcdhttp.Health{Health: false}
h := etcdhttp.Health{Health: "false"}
ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
_, err := c.Get(ctx, "a")
cancel()
h.Health = err == nil || err == rpctypes.ErrPermissionDenied
if !h.Health {
h.Errors = append(h.Errors, err.Error())
if err == nil || err == rpctypes.ErrPermissionDenied {
h.Health = "true"
}
return h
}

7
test
View File

@ -133,6 +133,9 @@ function functional_pass {
-peer-ports 12380,22380,32380 \
-limit 1 \
-schedule-cases "0 1 2 3 4 5" \
-stress-qps 1000 \
-stress-key-txn-count 100 \
-stress-key-txn-ops 10 \
-exit-on-failure && echo "'etcd-tester' succeeded"
ETCD_TESTER_EXIT_CODE=$?
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
@ -246,13 +249,13 @@ function grpcproxy_pass {
function release_pass {
rm -f ./bin/etcd-last-release
# to grab latest patch release; bump this up for every minor release
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.2.*" | head -1)
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
if [ -n "$MANUAL_VER" ]; then
# in case, we need to test against different version
UPGRADE_VER=$MANUAL_VER
fi
if [[ -z ${UPGRADE_VER} ]]; then
UPGRADE_VER="v3.2.0"
UPGRADE_VER="v3.3.0"
echo "fallback to" ${UPGRADE_VER}
fi

View File

@ -34,9 +34,11 @@ import (
type keyStresser struct {
Endpoint string
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
N int
@ -77,6 +79,14 @@ func (s *keyStresser) Stress() error {
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.35
stressEntries = append(stressEntries, stressEntry{
weight: 0.35,
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
for i := 0; i < s.N; i++ {
@ -202,6 +212,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
}
}
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
keys := make([]string, keyTxnSuffixRange)
for i := range keys {
keys[i] = fmt.Sprintf("/k%03d", i)
}
return writeTxn(kvc, keys, txnOps)
}
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
return func(ctx context.Context) (error, int64) {
ks := make(map[string]struct{}, txnOps)
for len(ks) != txnOps {
ks[keys[rand.Intn(len(keys))]] = struct{}{}
}
selected := make([]string, 0, txnOps)
for k := range ks {
selected = append(selected, k)
}
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
txnReq := &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
}
// add nested txns if any
for i := 1; i < txnOps; i++ {
k, v := selected[i], fmt.Sprintf("bar%02d", i)
com, delOp, putOp = getTxnReqs(k, v)
nested := &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
},
},
}
txnReq.Success = append(txnReq.Success, nested)
txnReq.Failure = append(txnReq.Failure, nested)
}
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
return err, int64(txnOps)
}
}
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
// if key exists (version > 0)
com = &pb.Compare{
Key: []byte(key),
Target: pb.Compare_VERSION,
Result: pb.Compare_GREATER,
TargetUnion: &pb.Compare_Version{Version: 0},
}
delOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte(key),
},
},
}
putOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte(key),
Value: []byte(val),
},
},
}
return com, delOp, putOp
}
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Range(ctx, &pb.RangeRequest{

View File

@ -47,6 +47,8 @@ func main() {
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
@ -120,15 +122,23 @@ func main() {
}
scfg := stressConfig{
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
numLeases: 10,
keysPerLease: 10,
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
keyTxnOps: int(*stressKeyTxnOps),
numLeases: 10,
keysPerLease: 10,
etcdRunnerPath: *etcdRunnerPath,
}
if scfg.keyTxnSuffixRange > 100 {
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
}
if scfg.keyTxnOps > 64 {
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
}
t := &tester{
failures: schedule,

View File

@ -16,17 +16,13 @@ package main
import (
"fmt"
"os"
"strings"
"sync"
"time"
"golang.org/x/time/rate"
"google.golang.org/grpc/grpclog"
)
func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
@ -117,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
}
type stressConfig struct {
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
numLeases int
keysPerLease int
@ -146,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
return &keyStresser{
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: 100,
rateLimiter: sc.rateLimiter,
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
keyTxnSuffixRange: sc.keyTxnSuffixRange,
keyTxnOps: sc.keyTxnOps,
N: 100,
rateLimiter: sc.rateLimiter,
}
case "v2keys":
return &v2Stresser{

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.3.0"
Version = "3.3.0-rc.3"
APIVersion = "unknown"
// Git SHA Value will be set during build