Compare commits

...

26 Commits

Author SHA1 Message Date
9e079d8f02 version: 3.3.0-rc.2
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-11 11:18:46 -08:00
bd57c9ca5b etcd-tester: fix "writeTxn" key selection
Found when debugging https://github.com/coreos/etcd/issues/9130.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-11 11:18:05 -08:00
58c402a47b test: limit stress-qps for slow CI machines, add txn flags
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2018-01-09 14:18:45 -08:00
3ce73b70bc etcd-tester: add txn stresser
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2018-01-09 14:18:33 -08:00
ee3c81d8d3 ctlv3: add "snapshot restore --wal-dir"
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-09 11:12:29 -08:00
2dfabfbef6 DocCommand: use regex wildcard
The current command as such produces no output on mac term or bash shell.
Using regex wildcard works fine on mac and linux.
2018-01-09 09:11:16 -08:00
bf83d5269f clientv3/integration: fix typos
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-09 09:11:15 -08:00
a609b1eb47 integration: add constant RequestWaitTimeout. 2018-01-09 09:11:15 -08:00
1ae0c0b47d mvcc: check null before set FillPercent not to panic
Since CreateBucketIfNotExists() can return nil when it gets an error,
accessing FillPercent must be done after a nil check, not to cause
a panic.
2018-01-08 13:08:03 -08:00
ec43197344 etcdserver/api/v3rpc: debug user cancellation and log warning for rest
The context error with cancel code is typically for user cancellation which
should be at debug level. For other error codes we should display a warning.

Fixes #9085
2018-01-08 10:14:37 -08:00
70ba0518f1 embed: enable extensive metrics if specified 2018-01-07 18:48:59 -08:00
e330f5004f etcdmain: unset ETCD_UNSUPPORTED_ARCH after arch check
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-05 03:38:35 +00:00
0ec5023b7b pkg/expect: fix deadlock in mac OS
bufio.NewReader.ReadString blocks even
when the process received syscall.SIGKILL.
Remove ptyMu mutex and make ReadString return
when *os.File is closed.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 14:34:01 -08:00
0f69520622 version: bump up to 3.3.0-rc.1+git
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 14:33:10 -08:00
d3c2acf090 version: bump up to 3.3.0-rc.1
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:27:15 -08:00
5e35f79087 clientv3/integration: fix TestKVLargeRequests with -tags cluster_proxy
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:07:24 -08:00
6dff1a9398 tools/functional-tester: remove duplicate grpclog set
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
325913d6fb etcdserver/api/v3rpc: set grpclog once
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
24c9fb0527 etcdserver,embed: discard gRPC info logs when debug is off
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
8511db5e2b etcdserver/api/v3rpc: log stream error with debug level
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-01-02 11:02:17 -08:00
3193f3c9ab clientv3/leasing: fix racey waitSession
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 17:51:03 -08:00
bdc508cadf grpc-proxy: add "--debug" flag to "etcd grpc-proxy start" command
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 14:44:10 -08:00
d5a0609412 embed: only discard infos when debug flag is off
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-21 14:44:02 -08:00
67af1a2138 CHANGELOG: remove rc in release-3.3
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:32:15 -08:00
66d68a8fdb *: update release upgrade test versions
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:16:59 -08:00
ebaa83c985 version: bump up to 3.3.0+git
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2017-12-20 14:16:49 -08:00
30 changed files with 253 additions and 76 deletions

View File

@ -2,7 +2,7 @@
TEST_SUFFIX=$(date +%s | base64 | head -c 15)
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.2.11"
TEST_OPTS="RELEASE_TEST=y INTEGRATION=y PASSES='build unit release integration_e2e functional' MANUAL_VER=v3.3.0-rc.0"
if [ "$TEST_ARCH" == "386" ]; then
TEST_OPTS="GOARCH=386 PASSES='build unit integration_e2e'"
fi

View File

@ -1,8 +1,4 @@
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0) (2018-01-??)
**v3.3.0 is not yet released; expected to be released in January 2018.**
## [v3.3.0-rc.0](https://github.com/coreos/etcd/releases/tag/v3.3.0-rc.0) (2017-12-20)
## [v3.3.0](https://github.com/coreos/etcd/releases/tag/v3.3.0)
See [code changes](https://github.com/coreos/etcd/compare/v3.2.0...v3.3.0) and [v3.3 upgrade guide](https://github.com/coreos/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md) for any breaking changes.

View File

@ -87,7 +87,7 @@ Removing excessive keyspace data and defragmenting the backend database will put
```sh
# get current revision
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9]*')
$ rev=$(ETCDCTL_API=3 etcdctl --endpoints=:2379 endpoint status --write-out="json" | egrep -o '"revision":[0-9]*' | egrep -o '[0-9].*')
# compact away all old revisions
$ ETCDCTL_API=3 etcdctl compact $rev
compacted revision 1516

View File

@ -54,7 +54,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// TODO: only send healthy endpoint to gRPC so gRPC wont waste time to
// dial for unhealthy endpoint.
// then we can reduce 3s to 1s.
timeout := pingInterval + 3*time.Second
timeout := pingInterval + integration.RequestWaitTimeout
cli, err := clientv3.New(ccfg)
if err != nil {

View File

@ -121,7 +121,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
if !setBefore {
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}

View File

@ -453,7 +453,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
clus.TakeClient(0)
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
case <-donec:
}
@ -480,7 +480,7 @@ func TestKVNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
case <-donec:
}
@ -906,7 +906,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760),
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
@ -920,7 +920,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760),
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
},
}
for i, test := range tests {
@ -939,7 +939,7 @@ func TestKVLargeRequests(t *testing.T) {
if err != test.expectError {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
} else if err != nil && err.Error() != test.expectError.Error() {
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}

View File

@ -299,7 +299,7 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
@ -325,7 +325,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
case <-donec:
}
@ -357,7 +357,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Revoke took too long")
case <-donec:
}

View File

@ -234,7 +234,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
select {
case <-wch:
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("took too long to create watch")
}
@ -252,7 +252,7 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
if err = ev.Err(); err != rpctypes.ErrNoLeader {
t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
}
case <-time.After(3 * time.Second): // enough time to detect leader lost
case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
t.Fatal("took too long to detect leader lost")
}
}

View File

@ -63,7 +63,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify())
select {
case <-wch:
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("took too long to create watch")
}
@ -348,7 +348,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
clus.Members[target].Restart(t)
select {
case <-time.After(clientTimeout + 3*time.Second):
case <-time.After(clientTimeout + integration.RequestWaitTimeout):
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
case <-donec:
}

View File

@ -678,7 +678,7 @@ func TestWatchErrConnClosed(t *testing.T) {
clus.TakeClient(0)
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("wc.Watch took too long")
case <-donec:
}
@ -705,7 +705,7 @@ func TestWatchAfterClose(t *testing.T) {
close(donec)
}()
select {
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("wc.Watch took too long")
case <-donec:
}
@ -751,7 +751,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
if resp.Err() != rpctypes.ErrNoLeader {
t.Fatalf("expected %v watch response error, got %+v", rpctypes.ErrNoLeader, resp)
}
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("watch without leader took too long to close")
}
@ -760,7 +760,7 @@ func TestWatchWithRequireLeader(t *testing.T) {
if ok {
t.Fatalf("expected closed channel, got response %v", resp)
}
case <-time.After(3 * time.Second):
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("waited too long for channel to close")
}

View File

@ -445,8 +445,11 @@ func (lkv *leasingKV) revokeLeaseKvs(ctx context.Context, kvs []*mvccpb.KeyValue
}
func (lkv *leasingKV) waitSession(ctx context.Context) error {
lkv.leases.mu.RLock()
sessionc := lkv.sessionc
lkv.leases.mu.RUnlock()
select {
case <-lkv.sessionc:
case <-sessionc:
return nil
case <-lkv.ctx.Done():
return lkv.ctx.Err()

View File

@ -268,8 +268,11 @@ func (cfg *Config) SetupLogging() {
if cfg.Debug {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
if cfg.LogPkgLevels != "" {
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")

View File

@ -41,6 +41,7 @@ import (
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@ -179,6 +180,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
AuthToken: cfg.AuthToken,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
Debug: cfg.Debug,
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
@ -522,6 +524,10 @@ func (e *Etcd) serveClients() (err error) {
}
func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetricsHealth(metricsMux, e.Server)

View File

@ -874,6 +874,8 @@ The snapshot restore options closely resemble to those used in the `etcd` comman
- data-dir -- Path to the data directory. Uses \<name\>.etcd if none given.
- wal-dir -- Path to the WAL directory. Uses data directory if none given.
- initial-cluster -- The initial cluster configuration for the restored etcd cluster.
- initial-cluster-token -- Initial cluster token for the restored etcd cluster.

View File

@ -56,6 +56,7 @@ var (
restoreCluster string
restoreClusterToken string
restoreDataDir string
restoreWalDir string
restorePeerURLs string
restoreName string
skipHashCheck bool
@ -99,6 +100,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
Run: snapshotRestoreCommandFunc,
}
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
@ -187,7 +189,10 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
basedir = restoreName + ".etcd"
}
waldir := filepath.Join(basedir, "member", "wal")
waldir := restoreWalDir
if waldir == "" {
waldir = filepath.Join(basedir, "member", "wal")
}
snapdir := filepath.Join(basedir, "member", "snap")
if _, err := os.Stat(basedir); err == nil {

View File

@ -40,7 +40,6 @@ import (
"github.com/coreos/etcd/version"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
)
@ -179,10 +178,6 @@ func startEtcdOrProxyV2() {
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
@ -392,6 +387,9 @@ func checkSupportArch() {
if runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64le" {
return
}
// unsupported arch only configured via environment variable
// so unset here to not parse through flag
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
return

View File

@ -17,6 +17,7 @@ package etcdmain
import (
"context"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
@ -37,10 +38,12 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/grpcproxy"
"github.com/coreos/pkg/capnslog"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
var (
@ -75,6 +78,8 @@ var (
grpcProxyEnablePprof bool
grpcProxyEnableOrdering bool
grpcProxyDebug bool
)
func init() {
@ -127,12 +132,26 @@ func newGRPCProxyStartCommand() *cobra.Command {
// experimental flags
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
return &cmd
}
func startGRPCProxy(cmd *cobra.Command, args []string) {
checkArgs()
capnslog.SetGlobalLogLevel(capnslog.INFO)
if grpcProxyDebug {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
if tlsinfo == nil && grpcProxyListenAutoTLS {
host := []string{"https://" + grpcProxyListenAddr}

View File

@ -16,8 +16,10 @@ package v3rpc
import (
"crypto/tls"
"io/ioutil"
"math"
"os"
"sync"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -36,9 +38,8 @@ const (
maxSendBytes = math.MaxInt32
)
func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
}
// integration tests call this multiple times, which is racey in gRPC side
var grpclogOnce sync.Once
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
@ -70,5 +71,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
grpclogOnce.Do(func() {
if s.Cfg.Debug {
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
return grpcServer
}

View File

@ -107,7 +107,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
return nil
}
if err != nil {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
}
return err
}
@ -133,7 +137,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
if isClientCtxErr(stream.Context().Err(), err) {
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
}
return err
}
}

View File

@ -81,3 +81,16 @@ func togRPCError(err error) error {
}
return grpcErr
}
func isClientCtxErr(ctxErr error, err error) bool {
if ctxErr != nil {
return true
}
ev, ok := status.FromError(err)
if !ok {
return false
}
code := ev.Code()
return code == codes.Canceled || code == codes.DeadlineExceeded
}

View File

@ -140,7 +140,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
// deadlock when calling sws.close().
go func() {
if rerr := sws.recvLoop(); rerr != nil {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
if isClientCtxErr(stream.Context().Err(), rerr) {
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
} else {
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
}
errc <- rerr
}
}()
@ -339,7 +343,11 @@ func (sws *serverWatchStream) sendLoop() {
mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
}
return
}
@ -356,7 +364,11 @@ func (sws *serverWatchStream) sendLoop() {
}
if err := sws.gRPCStream.Send(c); err != nil {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
}
return
}
@ -372,7 +384,11 @@ func (sws *serverWatchStream) sendLoop() {
for _, v := range pending[wid] {
mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
} else {
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
}
return
}
}

View File

@ -70,6 +70,8 @@ type ServerConfig struct {
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
Debug bool
}
// VerifyBootstrap sanity-checks the initial config for bootstrap case

View File

@ -58,10 +58,12 @@ import (
)
const (
tickDuration = 10 * time.Millisecond
clusterName = "etcd"
requestTimeout = 20 * time.Second
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
RequestWaitTimeout = 3 * time.Second
tickDuration = 10 * time.Millisecond
requestTimeout = 20 * time.Second
clusterName = "etcd"
basePort = 21000
UrlScheme = "unix"
UrlSchemeTLS = "unixs"

View File

@ -373,10 +373,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for seq write in for each
b.ForEach(func(k, v []byte) error {
count++

View File

@ -33,7 +33,6 @@ type ExpectProcess struct {
fpty *os.File
wg sync.WaitGroup
ptyMu sync.Mutex // protects accessing fpty
cond *sync.Cond // for broadcasting updates are available
mu sync.Mutex // protects lines and err
lines []string
@ -76,9 +75,7 @@ func (ep *ExpectProcess) read() {
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
r := bufio.NewReader(ep.fpty)
for ep.err == nil {
ep.ptyMu.Lock()
l, rerr := r.ReadString('\n')
ep.ptyMu.Unlock()
ep.mu.Lock()
ep.err = rerr
if l != "" {
@ -150,9 +147,7 @@ func (ep *ExpectProcess) close(kill bool) error {
}
err := ep.cmd.Wait()
ep.ptyMu.Lock()
ep.fpty.Close()
ep.ptyMu.Unlock()
ep.wg.Wait()
if err != nil {

7
test
View File

@ -133,6 +133,9 @@ function functional_pass {
-peer-ports 12380,22380,32380 \
-limit 1 \
-schedule-cases "0 1 2 3 4 5" \
-stress-qps 1000 \
-stress-key-txn-count 100 \
-stress-key-txn-ops 10 \
-exit-on-failure && echo "'etcd-tester' succeeded"
ETCD_TESTER_EXIT_CODE=$?
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
@ -246,13 +249,13 @@ function grpcproxy_pass {
function release_pass {
rm -f ./bin/etcd-last-release
# to grab latest patch release; bump this up for every minor release
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.2.*" | head -1)
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
if [ -n "$MANUAL_VER" ]; then
# in case, we need to test against different version
UPGRADE_VER=$MANUAL_VER
fi
if [[ -z ${UPGRADE_VER} ]]; then
UPGRADE_VER="v3.2.0"
UPGRADE_VER="v3.3.0"
echo "fallback to" ${UPGRADE_VER}
fi

View File

@ -34,9 +34,11 @@ import (
type keyStresser struct {
Endpoint string
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
N int
@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error {
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.24
stressEntries[1].weight = 0.24
stressEntries = append(stressEntries, stressEntry{
weight: 0.24,
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
for i := 0; i < s.N; i++ {
@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
}
}
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
keys := make([]string, keyTxnSuffixRange)
for i := range keys {
keys[i] = fmt.Sprintf("/k%03d", i)
}
return writeTxn(kvc, keys, txnOps)
}
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
return func(ctx context.Context) (error, int64) {
ks := make(map[string]struct{}, txnOps)
for len(ks) != txnOps {
ks[keys[rand.Intn(len(keys))]] = struct{}{}
}
selected := make([]string, 0, txnOps)
for k := range ks {
selected = append(selected, k)
}
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
txnReq := &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
}
// add nested txns if any
for i := 1; i < txnOps; i++ {
k, v := selected[i], fmt.Sprintf("bar%02d", i)
com, delOp, putOp = getTxnReqs(k, v)
nested := &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
},
},
}
txnReq.Success = append(txnReq.Success, nested)
txnReq.Failure = append(txnReq.Failure, nested)
}
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
return err, int64(txnOps)
}
}
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
// if key exists (version > 0)
com = &pb.Compare{
Key: []byte(key),
Target: pb.Compare_VERSION,
Result: pb.Compare_GREATER,
TargetUnion: &pb.Compare_Version{Version: 0},
}
delOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte(key),
},
},
}
putOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte(key),
Value: []byte(val),
},
},
}
return com, delOp, putOp
}
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Range(ctx, &pb.RangeRequest{

View File

@ -47,6 +47,8 @@ func main() {
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
@ -120,15 +122,23 @@ func main() {
}
scfg := stressConfig{
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
numLeases: 10,
keysPerLease: 10,
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
keyTxnOps: int(*stressKeyTxnOps),
numLeases: 10,
keysPerLease: 10,
etcdRunnerPath: *etcdRunnerPath,
}
if scfg.keyTxnSuffixRange > 100 {
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
}
if scfg.keyTxnOps > 64 {
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
}
t := &tester{
failures: schedule,

View File

@ -16,17 +16,13 @@ package main
import (
"fmt"
"os"
"strings"
"sync"
"time"
"golang.org/x/time/rate"
"google.golang.org/grpc/grpclog"
)
func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
@ -117,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
}
type stressConfig struct {
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
numLeases int
keysPerLease int
@ -146,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
return &keyStresser{
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: 100,
rateLimiter: sc.rateLimiter,
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
keyTxnSuffixRange: sc.keyTxnSuffixRange,
keyTxnOps: sc.keyTxnOps,
N: 100,
rateLimiter: sc.rateLimiter,
}
case "v2keys":
return &v2Stresser{

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.3.0"
Version = "3.3.0-rc.2"
APIVersion = "unknown"
// Git SHA Value will be set during build