Integrate verification framework
Verification framework is integrated with: - integration tests (by default) - `ETCD_VERIFY=all etcdctl snapshot restore` command - etcd shutdown when running with `ETCD_VERIFY=all` env.
This commit is contained in:
@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
|
|||||||
<hr>
|
<hr>
|
||||||
|
|
||||||
|
|
||||||
## v3.5.0 (2021 TBD)
|
## v3.5.0 (2021-06)
|
||||||
|
|
||||||
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://etcd.io/docs/latest/upgrades/upgrade_3_5/) for any breaking changes.
|
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://etcd.io/docs/latest/upgrades/upgrade_3_5/) for any breaking changes.
|
||||||
|
|
||||||
@ -160,6 +160,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
|
|||||||
- Add [`--socket-reuse-address`](https://github.com/etcd-io/etcd/pull/12702) flag
|
- Add [`--socket-reuse-address`](https://github.com/etcd-io/etcd/pull/12702) flag
|
||||||
- Setting this flag enables `SO_REUSEADDR` which allows binding to an address in `TIME_WAIT` state, improving etcd restart time.
|
- Setting this flag enables `SO_REUSEADDR` which allows binding to an address in `TIME_WAIT` state, improving etcd restart time.
|
||||||
- Reduce [around 30% memory allocation by logging range response size without marshal](https://github.com/etcd-io/etcd/pull/12871).
|
- Reduce [around 30% memory allocation by logging range response size without marshal](https://github.com/etcd-io/etcd/pull/12871).
|
||||||
|
- `ETCD_VERIFY="all"` enviroment triggers [additional verification of consistency](https://github.com/etcd-io/etcd/pull/) of etcd data-dir files.
|
||||||
### Package `runtime`
|
### Package `runtime`
|
||||||
|
|
||||||
- Optimize [`runtime.FDUsage` by removing unnecessary sorting](https://github.com/etcd-io/etcd/pull/12214).
|
- Optimize [`runtime.FDUsage` by removing unnecessary sorting](https://github.com/etcd-io/etcd/pull/12214).
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/server/v3/verify"
|
||||||
"go.etcd.io/etcd/server/v3/wal"
|
"go.etcd.io/etcd/server/v3/wal"
|
||||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -276,7 +277,11 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||||||
zap.String("snap-dir", s.snapDir),
|
zap.String("snap-dir", s.snapDir),
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return verify.VerifyIfEnabled(verify.Config{
|
||||||
|
ExactIndex: true,
|
||||||
|
Logger: s.lg,
|
||||||
|
DataDir: dataDir,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *v3Manager) outDbPath() string {
|
func (s *v3Manager) outDbPath() string {
|
||||||
|
@ -42,6 +42,7 @@ import (
|
|||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
||||||
|
"go.etcd.io/etcd/server/v3/verify"
|
||||||
|
|
||||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"github.com/soheilhy/cmux"
|
"github.com/soheilhy/cmux"
|
||||||
@ -338,6 +339,11 @@ func (e *Etcd) Close() {
|
|||||||
lg.Info("closing etcd server", fields...)
|
lg.Info("closing etcd server", fields...)
|
||||||
defer func() {
|
defer func() {
|
||||||
lg.Info("closed etcd server", fields...)
|
lg.Info("closed etcd server", fields...)
|
||||||
|
verify.MustVerifyIfEnabled(verify.Config{
|
||||||
|
Logger: lg,
|
||||||
|
DataDir: e.cfg.Dir,
|
||||||
|
ExactIndex: false,
|
||||||
|
})
|
||||||
lg.Sync()
|
lg.Sync()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -513,7 +519,6 @@ func (e *Etcd) servePeers() (err error) {
|
|||||||
e.cfg.logger.Info(
|
e.cfg.logger.Info(
|
||||||
"cmux::serve",
|
"cmux::serve",
|
||||||
zap.String("address", u),
|
zap.String("address", u),
|
||||||
zap.String("cmuxp", fmt.Sprintf("%p", m)),
|
|
||||||
)
|
)
|
||||||
return m.Serve()
|
return m.Serve()
|
||||||
}
|
}
|
||||||
@ -524,16 +529,13 @@ func (e *Etcd) servePeers() (err error) {
|
|||||||
e.cfg.logger.Info(
|
e.cfg.logger.Info(
|
||||||
"stopping serving peer traffic",
|
"stopping serving peer traffic",
|
||||||
zap.String("address", u),
|
zap.String("address", u),
|
||||||
zap.String("cmuxp", fmt.Sprintf("%p", m)),
|
|
||||||
)
|
)
|
||||||
stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
|
stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
|
||||||
e.cfg.logger.Info(
|
e.cfg.logger.Info(
|
||||||
"stopped serving peer traffic",
|
"stopped serving peer traffic",
|
||||||
zap.String("address", u),
|
zap.String("address", u),
|
||||||
zap.String("cmuxp", fmt.Sprintf("%p", m)),
|
|
||||||
)
|
)
|
||||||
m.Close()
|
m.Close()
|
||||||
e.cfg.logger.Info("Closed", zap.String("cmuxp", fmt.Sprintf("%p", m)))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@ import (
|
|||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock"
|
||||||
lockpb "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
|
lockpb "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
||||||
|
"go.etcd.io/etcd/server/v3/verify"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
@ -583,6 +584,7 @@ type member struct {
|
|||||||
useIP bool
|
useIP bool
|
||||||
|
|
||||||
isLearner bool
|
isLearner bool
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *member) GRPCAddr() string { return m.grpcAddr }
|
func (m *member) GRPCAddr() string { return m.grpcAddr }
|
||||||
@ -704,13 +706,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
|||||||
m.InitialCorruptCheck = true
|
m.InitialCorruptCheck = true
|
||||||
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
|
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
|
||||||
|
|
||||||
level := zapcore.InfoLevel
|
m.Logger = memberLogger(t, mcfg.name)
|
||||||
if os.Getenv("CLUSTER_DEBUG") != "" {
|
|
||||||
level = zapcore.DebugLevel
|
|
||||||
}
|
|
||||||
|
|
||||||
options := zaptest.WrapOptions(zap.Fields(zap.String("member", mcfg.name)))
|
|
||||||
m.Logger = zaptest.NewLogger(t, zaptest.Level(level), options).Named(mcfg.name)
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
// if we didn't cleanup the logger, the consecutive test
|
// if we didn't cleanup the logger, the consecutive test
|
||||||
// might reuse this (t).
|
// might reuse this (t).
|
||||||
@ -719,6 +715,16 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func memberLogger(t testutil.TB, name string) *zap.Logger {
|
||||||
|
level := zapcore.InfoLevel
|
||||||
|
if os.Getenv("CLUSTER_DEBUG") != "" {
|
||||||
|
level = zapcore.DebugLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
options := zaptest.WrapOptions(zap.Fields(zap.String("member", name)))
|
||||||
|
return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name)
|
||||||
|
}
|
||||||
|
|
||||||
// listenGRPC starts a grpc server over a unix domain socket on the member
|
// listenGRPC starts a grpc server over a unix domain socket on the member
|
||||||
func (m *member) listenGRPC() error {
|
func (m *member) listenGRPC() error {
|
||||||
// prefix with localhost so cert has right domain
|
// prefix with localhost so cert has right domain
|
||||||
@ -782,7 +788,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
|
|||||||
|
|
||||||
// Clone returns a member with the same server configuration. The returned
|
// Clone returns a member with the same server configuration. The returned
|
||||||
// member will not set PeerListeners and ClientListeners.
|
// member will not set PeerListeners and ClientListeners.
|
||||||
func (m *member) Clone(_ testutil.TB) *member {
|
func (m *member) Clone(t testutil.TB) *member {
|
||||||
mm := &member{}
|
mm := &member{}
|
||||||
mm.ServerConfig = m.ServerConfig
|
mm.ServerConfig = m.ServerConfig
|
||||||
|
|
||||||
@ -809,6 +815,7 @@ func (m *member) Clone(_ testutil.TB) *member {
|
|||||||
mm.ElectionTicks = m.ElectionTicks
|
mm.ElectionTicks = m.ElectionTicks
|
||||||
mm.PeerTLSInfo = m.PeerTLSInfo
|
mm.PeerTLSInfo = m.PeerTLSInfo
|
||||||
mm.ClientTLSInfo = m.ClientTLSInfo
|
mm.ClientTLSInfo = m.ClientTLSInfo
|
||||||
|
mm.Logger = memberLogger(t, mm.Name+"c")
|
||||||
return mm
|
return mm
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1071,6 +1078,16 @@ func (m *member) Close() {
|
|||||||
for _, f := range m.serverClosers {
|
for _, f := range m.serverClosers {
|
||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
|
if !m.closed {
|
||||||
|
// Avoid verification of the same file multiple times
|
||||||
|
// (that might not exist any longer)
|
||||||
|
verify.MustVerifyIfEnabled(verify.Config{
|
||||||
|
Logger: m.Logger,
|
||||||
|
DataDir: m.DataDir,
|
||||||
|
ExactIndex: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
m.closed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the member, but the data dir of the member is preserved.
|
// Stop stops the member, but the data dir of the member is preserved.
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"go.etcd.io/etcd/client/v2"
|
"go.etcd.io/etcd/client/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -65,6 +66,7 @@ func TestRestartMember(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
||||||
|
BeforeTest(t)
|
||||||
size := 3
|
size := 3
|
||||||
c := NewCluster(t, size)
|
c := NewCluster(t, size)
|
||||||
m := c.Members[0].Clone(t)
|
m := c.Members[0].Clone(t)
|
||||||
@ -78,6 +80,9 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
|||||||
|
|
||||||
if err := m.Launch(); err == nil {
|
if err := m.Launch(); err == nil {
|
||||||
t.Errorf("unexpect successful launch")
|
t.Errorf("unexpect successful launch")
|
||||||
|
} else {
|
||||||
|
t.Logf("launch failed as expected: %v", err)
|
||||||
|
assert.Contains(t, err.Error(), "has already been bootstrapped")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@ package snapshot_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -42,7 +41,6 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
for i := 0; i < clusterN; i++ {
|
for i := 0; i < clusterN; i++ {
|
||||||
os.RemoveAll(srvs[i].Config().Dir)
|
|
||||||
srvs[i].Close()
|
srvs[i].Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -82,7 +80,6 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
os.RemoveAll(cfg.Dir)
|
|
||||||
srv.Close()
|
srv.Close()
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"go.etcd.io/etcd/etcdctl/v3/snapshot"
|
"go.etcd.io/etcd/etcdctl/v3/snapshot"
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
"go.etcd.io/etcd/tests/v3/integration"
|
"go.etcd.io/etcd/tests/v3/integration"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,7 +40,6 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
|
|||||||
integration.BeforeTest(t)
|
integration.BeforeTest(t)
|
||||||
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
|
||||||
dbPath := createSnapshotFile(t, kvs)
|
dbPath := createSnapshotFile(t, kvs)
|
||||||
defer os.RemoveAll(dbPath)
|
|
||||||
|
|
||||||
clusterN := 1
|
clusterN := 1
|
||||||
urls := newEmbedURLs(clusterN * 2)
|
urls := newEmbedURLs(clusterN * 2)
|
||||||
@ -73,7 +73,6 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
os.RemoveAll(cfg.Dir)
|
|
||||||
srv.Close()
|
srv.Close()
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
@ -215,7 +214,6 @@ func createSnapshotFile(t *testing.T, kvs []kv) string {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
os.RemoveAll(cfg.Dir)
|
|
||||||
return dpPath
|
return dpPath
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +241,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) (
|
|||||||
cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
|
cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
|
||||||
cfg.InitialCluster = ics
|
cfg.InitialCluster = ics
|
||||||
|
|
||||||
sp := snapshot.NewV3(zaptest.NewLogger(t))
|
sp := snapshot.NewV3(
|
||||||
|
zaptest.NewLogger(t, zaptest.Level(zapcore.InfoLevel)).Named(cfg.Name).Named("sm"))
|
||||||
|
|
||||||
if err := sp.Restore(snapshot.RestoreConfig{
|
if err := sp.Restore(snapshot.RestoreConfig{
|
||||||
SnapshotPath: dbPath,
|
SnapshotPath: dbPath,
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
|
"go.etcd.io/etcd/server/v3/verify"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"go.uber.org/zap/zapgrpc"
|
"go.uber.org/zap/zapgrpc"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
@ -38,6 +39,9 @@ func BeforeTest(t testutil.TB) {
|
|||||||
|
|
||||||
grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc")))
|
grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc")))
|
||||||
|
|
||||||
|
// Integration tests should verify written state as much as possible.
|
||||||
|
os.Setenv(verify.ENV_VERIFY, verify.ENV_VERIFY_ALL_VALUE)
|
||||||
|
|
||||||
previousWD, err := os.Getwd()
|
previousWD, err := os.Getwd()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
Reference in New Issue
Block a user