Compare commits
26 Commits
api/v3.5.7
...
v3.3.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
9e079d8f02 | |||
bd57c9ca5b | |||
58c402a47b | |||
3ce73b70bc | |||
ee3c81d8d3 | |||
2dfabfbef6 | |||
bf83d5269f | |||
a609b1eb47 | |||
1ae0c0b47d | |||
ec43197344 | |||
70ba0518f1 | |||
e330f5004f | |||
0ec5023b7b | |||
0f69520622 | |||
d3c2acf090 | |||
5e35f79087 | |||
6dff1a9398 | |||
325913d6fb | |||
24c9fb0527 | |||
8511db5e2b | |||
3193f3c9ab | |||
bdc508cadf | |||
d5a0609412 | |||
67af1a2138 | |||
66d68a8fdb | |||
ebaa83c985 |
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
|
||||||
|
|
||||||
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.2.11"
|
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0-rc.0"
|
||||||
if [ "$TEST_ARCH" == "386" ]; then
|
if [ "$TEST_ARCH" == "386" ]; then
|
||||||
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
|
||||||
fi
|
fi
|
||||||
|
@ -1,8 +1,4 @@
|
|||||||
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??)
|
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0)
|
||||||
|
|
||||||
**v3.3.0 is not yet released; expected to be released in January 2018.**
|
|
||||||
|
|
||||||
## [v3.3.0-rc.0](https://github.com/coreos/etcd/releases/tag/v3.3.0-rc.0) (2017-12-20)
|
|
||||||
|
|
||||||
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.
|
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ Removing excessive keyspace data and defragmenting the backend database will put
|
|||||||
|
|
||||||
```sh
|
```sh
|
||||||
# get current revision
|
# get current revision
|
||||||
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9]*')
|
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9].*')
|
||||||
# compact away all old revisions
|
# compact away all old revisions
|
||||||
$ ETCDCTL_API=3 etcdctl compact $rev
|
$ ETCDCTL_API=3 etcdctl compact $rev
|
||||||
compacted revision 1516
|
compacted revision 1516
|
||||||
|
@ -54,7 +54,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
|
|||||||
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
|
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
|
||||||
// dial for unhealthy endpoint.
|
// dial for unhealthy endpoint.
|
||||||
// then we can reduce 3s to 1s.
|
// then we can reduce 3s to 1s.
|
||||||
timeout := pingInterval + 3*time.Second
|
timeout := pingInterval + integration.RequestWaitTimeout
|
||||||
|
|
||||||
cli, err := clientv3.New(ccfg)
|
cli, err := clientv3.New(ccfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -121,7 +121,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
|
|||||||
if !setBefore {
|
if !setBefore {
|
||||||
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
|
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
|
||||||
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
|
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -453,7 +453,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
|
|||||||
clus.TakeClient(0)
|
clus.TakeClient(0)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("kv.Get took too long")
|
t.Fatal("kv.Get took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -480,7 +480,7 @@ func TestKVNewAfterClose(t *testing.T) {
|
|||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("kv.Get took too long")
|
t.Fatal("kv.Get took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -906,7 +906,7 @@ func TestKVLargeRequests(t *testing.T) {
|
|||||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||||
maxCallRecvBytesClient: 0,
|
maxCallRecvBytesClient: 0,
|
||||||
valueSize: 10 * 1024 * 1024,
|
valueSize: 10 * 1024 * 1024,
|
||||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
|
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
maxRequestBytesServer: 10 * 1024 * 1024,
|
maxRequestBytesServer: 10 * 1024 * 1024,
|
||||||
@ -920,7 +920,7 @@ func TestKVLargeRequests(t *testing.T) {
|
|||||||
maxCallSendBytesClient: 10 * 1024 * 1024,
|
maxCallSendBytesClient: 10 * 1024 * 1024,
|
||||||
maxCallRecvBytesClient: 0,
|
maxCallRecvBytesClient: 0,
|
||||||
valueSize: 10*1024*1024 + 5,
|
valueSize: 10*1024*1024 + 5,
|
||||||
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
|
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
@ -939,7 +939,7 @@ func TestKVLargeRequests(t *testing.T) {
|
|||||||
if err != test.expectError {
|
if err != test.expectError {
|
||||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||||
}
|
}
|
||||||
} else if err != nil && err.Error() != test.expectError.Error() {
|
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
|
||||||
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,7 +299,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("le.Grant took too long")
|
t.Fatal("le.Grant took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -325,7 +325,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
|
|||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("le.Grant took too long")
|
t.Fatal("le.Grant took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -357,7 +357,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
|
|||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("le.Revoke took too long")
|
t.Fatal("le.Revoke took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
|||||||
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
|
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
|
||||||
select {
|
select {
|
||||||
case <-wch:
|
case <-wch:
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("took too long to create watch")
|
t.Fatal("took too long to create watch")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,7 +252,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
|
|||||||
if err = ev.Err(); err != rpctypes.ErrNoLeader {
|
if err = ev.Err(); err != rpctypes.ErrNoLeader {
|
||||||
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
|
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
|
||||||
}
|
}
|
||||||
case <-time.After(3 * time.Second): // enough time to detect leader lost
|
case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
|
||||||
t.Fatal("took too long to detect leader lost")
|
t.Fatal("took too long to detect leader lost")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
|
|||||||
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
|
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
|
||||||
select {
|
select {
|
||||||
case <-wch:
|
case <-wch:
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("took too long to create watch")
|
t.Fatal("took too long to create watch")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -348,7 +348,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
|
|||||||
clus.Members[target].Restart(t)
|
clus.Members[target].Restart(t)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(clientTimeout + 3*time.Second):
|
case <-time.After(clientTimeout + integration.RequestWaitTimeout):
|
||||||
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
|
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
|
@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) {
|
|||||||
clus.TakeClient(0)
|
clus.TakeClient(0)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("wc.Watch took too long")
|
t.Fatal("wc.Watch took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) {
|
|||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("wc.Watch took too long")
|
t.Fatal("wc.Watch took too long")
|
||||||
case <-donec:
|
case <-donec:
|
||||||
}
|
}
|
||||||
@ -751,7 +751,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
|||||||
if resp.Err() != rpctypes.ErrNoLeader {
|
if resp.Err() != rpctypes.ErrNoLeader {
|
||||||
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
|
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
|
||||||
}
|
}
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("watch without leader took too long to close")
|
t.Fatal("watch without leader took too long to close")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -760,7 +760,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
|||||||
if ok {
|
if ok {
|
||||||
t.Fatalf("expected closed channel, got response %v", resp)
|
t.Fatalf("expected closed channel, got response %v", resp)
|
||||||
}
|
}
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(integration.RequestWaitTimeout):
|
||||||
t.Fatal("waited too long for channel to close")
|
t.Fatal("waited too long for channel to close")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,8 +445,11 @@ func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
func (lkv *leasingKV) waitSession(ctx context.Context) error {
|
||||||
|
lkv.leases.mu.RLock()
|
||||||
|
sessionc := lkv.sessionc
|
||||||
|
lkv.leases.mu.RUnlock()
|
||||||
select {
|
select {
|
||||||
case <-lkv.sessionc:
|
case <-sessionc:
|
||||||
return nil
|
return nil
|
||||||
case <-lkv.ctx.Done():
|
case <-lkv.ctx.Done():
|
||||||
return lkv.ctx.Err()
|
return lkv.ctx.Err()
|
||||||
|
@ -268,8 +268,11 @@ func (cfg *Config) SetupLogging() {
|
|||||||
if cfg.Debug {
|
if cfg.Debug {
|
||||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||||
grpc.EnableTracing = true
|
grpc.EnableTracing = true
|
||||||
|
// enable info, warning, error
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||||
} else {
|
} else {
|
||||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
|
// only discard info
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||||
}
|
}
|
||||||
if cfg.LogPkgLevels != "" {
|
if cfg.LogPkgLevels != "" {
|
||||||
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"github.com/soheilhy/cmux"
|
"github.com/soheilhy/cmux"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
@ -179,6 +180,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||||
|
Debug: cfg.Debug,
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||||
@ -522,6 +524,10 @@ func (e *Etcd) serveClients() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Etcd) serveMetrics() (err error) {
|
func (e *Etcd) serveMetrics() (err error) {
|
||||||
|
if e.cfg.Metrics == "extensive" {
|
||||||
|
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||||
|
}
|
||||||
|
|
||||||
if len(e.cfg.ListenMetricsUrls) > 0 {
|
if len(e.cfg.ListenMetricsUrls) > 0 {
|
||||||
metricsMux := http.NewServeMux()
|
metricsMux := http.NewServeMux()
|
||||||
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
|
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
|
||||||
|
@ -874,6 +874,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman
|
|||||||
|
|
||||||
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
|
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
|
||||||
|
|
||||||
|
- wal-dir -- Path to the WAL directory. Uses data directory if none given.
|
||||||
|
|
||||||
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
|
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
|
||||||
|
|
||||||
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.
|
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.
|
||||||
|
@ -56,6 +56,7 @@ var (
|
|||||||
restoreCluster string
|
restoreCluster string
|
||||||
restoreClusterToken string
|
restoreClusterToken string
|
||||||
restoreDataDir string
|
restoreDataDir string
|
||||||
|
restoreWalDir string
|
||||||
restorePeerURLs string
|
restorePeerURLs string
|
||||||
restoreName string
|
restoreName string
|
||||||
skipHashCheck bool
|
skipHashCheck bool
|
||||||
@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
|
|||||||
Run: snapshotRestoreCommandFunc,
|
Run: snapshotRestoreCommandFunc,
|
||||||
}
|
}
|
||||||
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
|
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
|
||||||
|
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
|
||||||
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
|
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
|
||||||
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
|
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
|
||||||
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
|
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
|
||||||
@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
basedir = restoreName + ".etcd"
|
basedir = restoreName + ".etcd"
|
||||||
}
|
}
|
||||||
|
|
||||||
waldir := filepath.Join(basedir, "member", "wal")
|
waldir := restoreWalDir
|
||||||
|
if waldir == "" {
|
||||||
|
waldir = filepath.Join(basedir, "member", "wal")
|
||||||
|
}
|
||||||
snapdir := filepath.Join(basedir, "member", "snap")
|
snapdir := filepath.Join(basedir, "member", "snap")
|
||||||
|
|
||||||
if _, err := os.Stat(basedir); err == nil {
|
if _, err := os.Stat(basedir); err == nil {
|
||||||
|
@ -40,7 +40,6 @@ import (
|
|||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -179,10 +178,6 @@ func startEtcdOrProxyV2() {
|
|||||||
|
|
||||||
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
||||||
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||||
if cfg.Metrics == "extensive" {
|
|
||||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
|
||||||
}
|
|
||||||
|
|
||||||
e, err := embed.StartEtcd(cfg)
|
e, err := embed.StartEtcd(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -392,6 +387,9 @@ func checkSupportArch() {
|
|||||||
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
|
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// unsupported arch only configured via environment variable
|
||||||
|
// so unset here to not parse through flag
|
||||||
|
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
|
||||||
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
|
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
|
||||||
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
|
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
|
||||||
return
|
return
|
||||||
|
@ -17,6 +17,7 @@ package etcdmain
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -37,10 +38,12 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
"github.com/coreos/etcd/proxy/grpcproxy"
|
"github.com/coreos/etcd/proxy/grpcproxy"
|
||||||
|
|
||||||
|
"github.com/coreos/pkg/capnslog"
|
||||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"github.com/soheilhy/cmux"
|
"github.com/soheilhy/cmux"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -75,6 +78,8 @@ var (
|
|||||||
|
|
||||||
grpcProxyEnablePprof bool
|
grpcProxyEnablePprof bool
|
||||||
grpcProxyEnableOrdering bool
|
grpcProxyEnableOrdering bool
|
||||||
|
|
||||||
|
grpcProxyDebug bool
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -127,12 +132,26 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
|||||||
// experimental flags
|
// experimental flags
|
||||||
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
|
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
|
||||||
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
|
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
|
||||||
|
|
||||||
|
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
|
||||||
|
|
||||||
return &cmd
|
return &cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||||
checkArgs()
|
checkArgs()
|
||||||
|
|
||||||
|
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||||
|
if grpcProxyDebug {
|
||||||
|
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||||
|
grpc.EnableTracing = true
|
||||||
|
// enable info, warning, error
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||||
|
} else {
|
||||||
|
// only discard info
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||||
|
}
|
||||||
|
|
||||||
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
|
||||||
if tlsinfo == nil && grpcProxyListenAutoTLS {
|
if tlsinfo == nil && grpcProxyListenAutoTLS {
|
||||||
host := []string{"https://" + grpcProxyListenAddr}
|
host := []string{"https://" + grpcProxyListenAddr}
|
||||||
|
@ -16,8 +16,10 @@ package v3rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
@ -36,9 +38,8 @@ const (
|
|||||||
maxSendBytes = math.MaxInt32
|
maxSendBytes = math.MaxInt32
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
// integration tests call this multiple times, which is racey in gRPC side
|
||||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
var grpclogOnce sync.Once
|
||||||
}
|
|
||||||
|
|
||||||
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
|
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
|
||||||
var opts []grpc.ServerOption
|
var opts []grpc.ServerOption
|
||||||
@ -70,5 +71,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
|
|||||||
// set zero values for metrics registered for this grpc server
|
// set zero values for metrics registered for this grpc server
|
||||||
grpc_prometheus.Register(grpcServer)
|
grpc_prometheus.Register(grpcServer)
|
||||||
|
|
||||||
|
grpclogOnce.Do(func() {
|
||||||
|
if s.Cfg.Debug {
|
||||||
|
grpc.EnableTracing = true
|
||||||
|
// enable info, warning, error
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||||
|
} else {
|
||||||
|
// only discard info
|
||||||
|
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return grpcServer
|
return grpcServer
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
if isClientCtxErr(stream.Context().Err(), err) {
|
||||||
|
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
|||||||
resp.TTL = ttl
|
resp.TTL = ttl
|
||||||
err = stream.Send(resp)
|
err = stream.Send(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
if isClientCtxErr(stream.Context().Err(), err) {
|
||||||
|
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,3 +81,16 @@ func togRPCError(err error) error {
|
|||||||
}
|
}
|
||||||
return grpcErr
|
return grpcErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isClientCtxErr(ctxErr error, err error) bool {
|
||||||
|
if ctxErr != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
ev, ok := status.FromError(err)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
code := ev.Code()
|
||||||
|
return code == codes.Canceled || code == codes.DeadlineExceeded
|
||||||
|
}
|
||||||
|
@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
|||||||
// deadlock when calling sws.close().
|
// deadlock when calling sws.close().
|
||||||
go func() {
|
go func() {
|
||||||
if rerr := sws.recvLoop(); rerr != nil {
|
if rerr := sws.recvLoop(); rerr != nil {
|
||||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
if isClientCtxErr(stream.Context().Err(), rerr) {
|
||||||
|
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||||
|
}
|
||||||
errc <- rerr
|
errc <- rerr
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -339,7 +343,11 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
|
|
||||||
mvcc.ReportEventReceived(len(evs))
|
mvcc.ReportEventReceived(len(evs))
|
||||||
if err := sws.gRPCStream.Send(wr); err != nil {
|
if err := sws.gRPCStream.Send(wr); err != nil {
|
||||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||||
|
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -356,7 +364,11 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := sws.gRPCStream.Send(c); err != nil {
|
if err := sws.gRPCStream.Send(c); err != nil {
|
||||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||||
|
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -372,7 +384,11 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
for _, v := range pending[wid] {
|
for _, v := range pending[wid] {
|
||||||
mvcc.ReportEventReceived(len(v.Events))
|
mvcc.ReportEventReceived(len(v.Events))
|
||||||
if err := sws.gRPCStream.Send(v); err != nil {
|
if err := sws.gRPCStream.Send(v); err != nil {
|
||||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||||
|
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||||
|
} else {
|
||||||
|
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,8 @@ type ServerConfig struct {
|
|||||||
// before serving any peer/client traffic.
|
// before serving any peer/client traffic.
|
||||||
InitialCorruptCheck bool
|
InitialCorruptCheck bool
|
||||||
CorruptCheckTime time.Duration
|
CorruptCheckTime time.Duration
|
||||||
|
|
||||||
|
Debug bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||||
|
@ -58,10 +58,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tickDuration = 10 * time.Millisecond
|
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
|
||||||
clusterName = "etcd"
|
RequestWaitTimeout = 3 * time.Second
|
||||||
requestTimeout = 20 * time.Second
|
tickDuration = 10 * time.Millisecond
|
||||||
|
requestTimeout = 20 * time.Second
|
||||||
|
|
||||||
|
clusterName = "etcd"
|
||||||
basePort = 21000
|
basePort = 21000
|
||||||
UrlScheme = "unix"
|
UrlScheme = "unix"
|
||||||
UrlSchemeTLS = "unixs"
|
UrlSchemeTLS = "unixs"
|
||||||
|
@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
||||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
|
||||||
if berr != nil {
|
if berr != nil {
|
||||||
return berr
|
return berr
|
||||||
}
|
}
|
||||||
|
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||||
|
|
||||||
b.ForEach(func(k, v []byte) error {
|
b.ForEach(func(k, v []byte) error {
|
||||||
count++
|
count++
|
||||||
|
@ -33,7 +33,6 @@ type ExpectProcess struct {
|
|||||||
fpty *os.File
|
fpty *os.File
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
ptyMu sync.Mutex // protects accessing fpty
|
|
||||||
cond *sync.Cond // for broadcasting updates are available
|
cond *sync.Cond // for broadcasting updates are available
|
||||||
mu sync.Mutex // protects lines and err
|
mu sync.Mutex // protects lines and err
|
||||||
lines []string
|
lines []string
|
||||||
@ -76,9 +75,7 @@ func (ep *ExpectProcess) read() {
|
|||||||
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
|
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
|
||||||
r := bufio.NewReader(ep.fpty)
|
r := bufio.NewReader(ep.fpty)
|
||||||
for ep.err == nil {
|
for ep.err == nil {
|
||||||
ep.ptyMu.Lock()
|
|
||||||
l, rerr := r.ReadString('\n')
|
l, rerr := r.ReadString('\n')
|
||||||
ep.ptyMu.Unlock()
|
|
||||||
ep.mu.Lock()
|
ep.mu.Lock()
|
||||||
ep.err = rerr
|
ep.err = rerr
|
||||||
if l != "" {
|
if l != "" {
|
||||||
@ -150,9 +147,7 @@ func (ep *ExpectProcess) close(kill bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := ep.cmd.Wait()
|
err := ep.cmd.Wait()
|
||||||
ep.ptyMu.Lock()
|
|
||||||
ep.fpty.Close()
|
ep.fpty.Close()
|
||||||
ep.ptyMu.Unlock()
|
|
||||||
ep.wg.Wait()
|
ep.wg.Wait()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
7
test
7
test
@ -133,6 +133,9 @@ function functional_pass {
|
|||||||
-peer-ports 12380,22380,32380 \
|
-peer-ports 12380,22380,32380 \
|
||||||
-limit 1 \
|
-limit 1 \
|
||||||
-schedule-cases "0 1 2 3 4 5" \
|
-schedule-cases "0 1 2 3 4 5" \
|
||||||
|
-stress-qps 1000 \
|
||||||
|
-stress-key-txn-count 100 \
|
||||||
|
-stress-key-txn-ops 10 \
|
||||||
-exit-on-failure && echo "'etcd-tester' succeeded"
|
-exit-on-failure && echo "'etcd-tester' succeeded"
|
||||||
ETCD_TESTER_EXIT_CODE=$?
|
ETCD_TESTER_EXIT_CODE=$?
|
||||||
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
||||||
@ -246,13 +249,13 @@ function grpcproxy_pass {
|
|||||||
function release_pass {
|
function release_pass {
|
||||||
rm -f ./bin/etcd-last-release
|
rm -f ./bin/etcd-last-release
|
||||||
# to grab latest patch release; bump this up for every minor release
|
# to grab latest patch release; bump this up for every minor release
|
||||||
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.2.*" | head -1)
|
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
|
||||||
if [ -n "$MANUAL_VER" ]; then
|
if [ -n "$MANUAL_VER" ]; then
|
||||||
# in case, we need to test against different version
|
# in case, we need to test against different version
|
||||||
UPGRADE_VER=$MANUAL_VER
|
UPGRADE_VER=$MANUAL_VER
|
||||||
fi
|
fi
|
||||||
if [[ -z ${UPGRADE_VER} ]]; then
|
if [[ -z ${UPGRADE_VER} ]]; then
|
||||||
UPGRADE_VER="v3.2.0"
|
UPGRADE_VER="v3.3.0"
|
||||||
echo "fallback to" ${UPGRADE_VER}
|
echo "fallback to" ${UPGRADE_VER}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@ -34,9 +34,11 @@ import (
|
|||||||
type keyStresser struct {
|
type keyStresser struct {
|
||||||
Endpoint string
|
Endpoint string
|
||||||
|
|
||||||
keyLargeSize int
|
keyLargeSize int
|
||||||
keySize int
|
keySize int
|
||||||
keySuffixRange int
|
keySuffixRange int
|
||||||
|
keyTxnSuffixRange int
|
||||||
|
keyTxnOps int
|
||||||
|
|
||||||
N int
|
N int
|
||||||
|
|
||||||
@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error {
|
|||||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||||
}
|
}
|
||||||
|
if s.keyTxnSuffixRange > 0 {
|
||||||
|
// adjust to make up ±70% of workloads with writes
|
||||||
|
stressEntries[0].weight = 0.24
|
||||||
|
stressEntries[1].weight = 0.24
|
||||||
|
stressEntries = append(stressEntries, stressEntry{
|
||||||
|
weight: 0.24,
|
||||||
|
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
|
||||||
|
})
|
||||||
|
}
|
||||||
s.stressTable = createStressTable(stressEntries)
|
s.stressTable = createStressTable(stressEntries)
|
||||||
|
|
||||||
for i := 0; i < s.N; i++ {
|
for i := 0; i < s.N; i++ {
|
||||||
@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
|
||||||
|
keys := make([]string, keyTxnSuffixRange)
|
||||||
|
for i := range keys {
|
||||||
|
keys[i] = fmt.Sprintf("/k%03d", i)
|
||||||
|
}
|
||||||
|
return writeTxn(kvc, keys, txnOps)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
|
||||||
|
return func(ctx context.Context) (error, int64) {
|
||||||
|
ks := make(map[string]struct{}, txnOps)
|
||||||
|
for len(ks) != txnOps {
|
||||||
|
ks[keys[rand.Intn(len(keys))]] = struct{}{}
|
||||||
|
}
|
||||||
|
selected := make([]string, 0, txnOps)
|
||||||
|
for k := range ks {
|
||||||
|
selected = append(selected, k)
|
||||||
|
}
|
||||||
|
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
|
||||||
|
txnReq := &pb.TxnRequest{
|
||||||
|
Compare: []*pb.Compare{com},
|
||||||
|
Success: []*pb.RequestOp{delOp},
|
||||||
|
Failure: []*pb.RequestOp{putOp},
|
||||||
|
}
|
||||||
|
|
||||||
|
// add nested txns if any
|
||||||
|
for i := 1; i < txnOps; i++ {
|
||||||
|
k, v := selected[i], fmt.Sprintf("bar%02d", i)
|
||||||
|
com, delOp, putOp = getTxnReqs(k, v)
|
||||||
|
nested := &pb.RequestOp{
|
||||||
|
Request: &pb.RequestOp_RequestTxn{
|
||||||
|
RequestTxn: &pb.TxnRequest{
|
||||||
|
Compare: []*pb.Compare{com},
|
||||||
|
Success: []*pb.RequestOp{delOp},
|
||||||
|
Failure: []*pb.RequestOp{putOp},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
txnReq.Success = append(txnReq.Success, nested)
|
||||||
|
txnReq.Failure = append(txnReq.Failure, nested)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
|
||||||
|
return err, int64(txnOps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
|
||||||
|
// if key exists (version > 0)
|
||||||
|
com = &pb.Compare{
|
||||||
|
Key: []byte(key),
|
||||||
|
Target: pb.Compare_VERSION,
|
||||||
|
Result: pb.Compare_GREATER,
|
||||||
|
TargetUnion: &pb.Compare_Version{Version: 0},
|
||||||
|
}
|
||||||
|
delOp = &pb.RequestOp{
|
||||||
|
Request: &pb.RequestOp_RequestDeleteRange{
|
||||||
|
RequestDeleteRange: &pb.DeleteRangeRequest{
|
||||||
|
Key: []byte(key),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
putOp = &pb.RequestOp{
|
||||||
|
Request: &pb.RequestOp_RequestPut{
|
||||||
|
RequestPut: &pb.PutRequest{
|
||||||
|
Key: []byte(key),
|
||||||
|
Value: []byte(val),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return com, delOp, putOp
|
||||||
|
}
|
||||||
|
|
||||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||||
return func(ctx context.Context) (error, int64) {
|
return func(ctx context.Context) (error, int64) {
|
||||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||||
|
@ -47,6 +47,8 @@ func main() {
|
|||||||
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
|
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
|
||||||
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
|
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
|
||||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||||
|
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
|
||||||
|
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
|
||||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||||
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
|
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
|
||||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||||
@ -120,15 +122,23 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
scfg := stressConfig{
|
scfg := stressConfig{
|
||||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||||
keyLargeSize: int(*stressKeyLargeSize),
|
keyLargeSize: int(*stressKeyLargeSize),
|
||||||
keySize: int(*stressKeySize),
|
keySize: int(*stressKeySize),
|
||||||
keySuffixRange: int(*stressKeySuffixRange),
|
keySuffixRange: int(*stressKeySuffixRange),
|
||||||
numLeases: 10,
|
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
|
||||||
keysPerLease: 10,
|
keyTxnOps: int(*stressKeyTxnOps),
|
||||||
|
numLeases: 10,
|
||||||
|
keysPerLease: 10,
|
||||||
|
|
||||||
etcdRunnerPath: *etcdRunnerPath,
|
etcdRunnerPath: *etcdRunnerPath,
|
||||||
}
|
}
|
||||||
|
if scfg.keyTxnSuffixRange > 100 {
|
||||||
|
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
|
||||||
|
}
|
||||||
|
if scfg.keyTxnOps > 64 {
|
||||||
|
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
|
||||||
|
}
|
||||||
|
|
||||||
t := &tester{
|
t := &tester{
|
||||||
failures: schedule,
|
failures: schedule,
|
||||||
|
@ -16,17 +16,13 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/grpc/grpclog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
|
|
||||||
|
|
||||||
type Stresser interface {
|
type Stresser interface {
|
||||||
// Stress starts to stress the etcd cluster
|
// Stress starts to stress the etcd cluster
|
||||||
Stress() error
|
Stress() error
|
||||||
@ -117,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type stressConfig struct {
|
type stressConfig struct {
|
||||||
keyLargeSize int
|
keyLargeSize int
|
||||||
keySize int
|
keySize int
|
||||||
keySuffixRange int
|
keySuffixRange int
|
||||||
|
keyTxnSuffixRange int
|
||||||
|
keyTxnOps int
|
||||||
|
|
||||||
numLeases int
|
numLeases int
|
||||||
keysPerLease int
|
keysPerLease int
|
||||||
@ -146,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
|
|||||||
// TODO: Too intensive stressers can panic etcd member with
|
// TODO: Too intensive stressers can panic etcd member with
|
||||||
// 'out of memory' error. Put rate limits in server side.
|
// 'out of memory' error. Put rate limits in server side.
|
||||||
return &keyStresser{
|
return &keyStresser{
|
||||||
Endpoint: m.grpcAddr(),
|
Endpoint: m.grpcAddr(),
|
||||||
keyLargeSize: sc.keyLargeSize,
|
keyLargeSize: sc.keyLargeSize,
|
||||||
keySize: sc.keySize,
|
keySize: sc.keySize,
|
||||||
keySuffixRange: sc.keySuffixRange,
|
keySuffixRange: sc.keySuffixRange,
|
||||||
N: 100,
|
keyTxnSuffixRange: sc.keyTxnSuffixRange,
|
||||||
rateLimiter: sc.rateLimiter,
|
keyTxnOps: sc.keyTxnOps,
|
||||||
|
N: 100,
|
||||||
|
rateLimiter: sc.rateLimiter,
|
||||||
}
|
}
|
||||||
case "v2keys":
|
case "v2keys":
|
||||||
return &v2Stresser{
|
return &v2Stresser{
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.3.0"
|
Version = "3.3.0-rc.2"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
Reference in New Issue
Block a user