Compare commits
50 Commits
api/v3.5.2
...
client/v2.
Author | SHA1 | Date | |
---|---|---|---|
08407ff760 | |||
c3c908e39a | |||
5c68f2e510 | |||
b872757492 | |||
081b4e2a0f | |||
76564778a9 | |||
0452feec71 | |||
842cb4b4fc | |||
50978d5b25 | |||
5d44f2242f | |||
cd750e4542 | |||
003a310489 | |||
6095cf810a | |||
719082e4fc | |||
4002aa51bd | |||
bc5307de95 | |||
b9cbff151c | |||
232fb980a7 | |||
383eceb885 | |||
bf22ef3b03 | |||
66c7aab4d3 | |||
3ace622792 | |||
780ec338f0 | |||
238b18c110 | |||
5f1968887c | |||
83538f342d | |||
3b8c6512df | |||
8b9ce3e150 | |||
a060b42e47 | |||
25556a08a8 | |||
12ceac6fdd | |||
462aefdfe1 | |||
fed325a95a | |||
c51c8d24e1 | |||
1801ef8d71 | |||
d5161347da | |||
58374b83a1 | |||
39baf36ca3 | |||
541635e36a | |||
3c2ef71358 | |||
1eb8b6a75c | |||
7cec92a281 | |||
f634b44046 | |||
7345d4211b | |||
fa191c64bd | |||
31c8e3c7a5 | |||
4f51cc1d9a | |||
7db1051774 | |||
631fa6fd65 | |||
f4708ae3d4 |
2
.github/workflows/e2e.yaml
vendored
2
.github/workflows/e2e.yaml
vendored
@ -13,7 +13,7 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.16.3"
|
||||
go-version: "1.16.15"
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
2
.github/workflows/functional.yaml
vendored
2
.github/workflows/functional.yaml
vendored
@ -12,7 +12,7 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.16.3"
|
||||
go-version: "1.16.15"
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
2
.github/workflows/grpcproxy.yaml
vendored
2
.github/workflows/grpcproxy.yaml
vendored
@ -12,7 +12,7 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.16.3"
|
||||
go-version: "1.16.15"
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
2
.github/workflows/tests.yaml
vendored
2
.github/workflows/tests.yaml
vendored
@ -18,7 +18,7 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.16.3"
|
||||
go-version: "1.16.15"
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
@ -6,7 +6,7 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- "1.16.3"
|
||||
- "1.16.15"
|
||||
- tip
|
||||
|
||||
notifications:
|
||||
@ -21,14 +21,14 @@ env:
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: "1.16.3"
|
||||
- go: "1.16.15"
|
||||
env: TARGET=linux-amd64-coverage
|
||||
- go: tip
|
||||
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
|
||||
exclude:
|
||||
- go: tip
|
||||
env: TARGET=linux-amd64-coverage
|
||||
- go: "1.16.3"
|
||||
- go: "1.16.15"
|
||||
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
|
||||
|
||||
before_install:
|
||||
|
@ -1,5 +1,5 @@
|
||||
# TODO: move to k8s.gcr.io/build-image/debian-base:bullseye-v1.y.z when patched
|
||||
FROM debian:bullseye-20210927
|
||||
FROM debian:bullseye-20220328
|
||||
|
||||
ADD etcd /usr/local/bin/
|
||||
ADD etcdctl /usr/local/bin/
|
||||
|
@ -1,5 +1,5 @@
|
||||
# TODO: move to k8s.gcr.io/build-image/debian-base-arm64:bullseye-1.y.z when patched
|
||||
FROM arm64v8/debian:bullseye-20210927
|
||||
FROM arm64v8/debian:bullseye-20220328
|
||||
|
||||
ADD etcd /usr/local/bin/
|
||||
ADD etcdctl /usr/local/bin/
|
||||
|
@ -1,5 +1,5 @@
|
||||
# TODO: move to k8s.gcr.io/build-image/debian-base-ppc64le:bullseye-1.y.z when patched
|
||||
FROM ppc64le/debian:bullseye-20210927
|
||||
FROM ppc64le/debian:bullseye-20220328
|
||||
|
||||
ADD etcd /usr/local/bin/
|
||||
ADD etcdctl /usr/local/bin/
|
||||
|
@ -1,5 +1,5 @@
|
||||
# TODO: move to k8s.gcr.io/build-image/debian-base-s390x:bullseye-1.y.z when patched
|
||||
FROM s390x/debian:bullseye-20210927
|
||||
FROM s390x/debian:bullseye-20220328
|
||||
|
||||
ADD etcd /usr/local/bin/
|
||||
ADD etcdctl /usr/local/bin/
|
||||
|
7
Makefile
7
Makefile
@ -55,7 +55,7 @@ docker-remove:
|
||||
|
||||
|
||||
|
||||
GO_VERSION ?= 1.16.3
|
||||
GO_VERSION ?= 1.16.15
|
||||
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
|
||||
|
||||
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
|
||||
@ -161,7 +161,10 @@ test-full:
|
||||
$(info log-file: test-$(TEST_SUFFIX).log)
|
||||
PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
|
||||
|
||||
docker-test:
|
||||
ensure-docker-test-image-exists:
|
||||
make pull-docker-test || echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test
|
||||
|
||||
docker-test: ensure-docker-test-image-exists
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
$(info ETCD_VERSION: $(ETCD_VERSION))
|
||||
$(info TEST_OPTS: $(TEST_OPTS))
|
||||
|
@ -75,6 +75,7 @@ var (
|
||||
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
|
||||
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
|
||||
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
|
||||
ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err()
|
||||
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
|
||||
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
|
||||
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
|
||||
@ -208,6 +209,7 @@ var (
|
||||
ErrTimeout = Error(ErrGRPCTimeout)
|
||||
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
|
||||
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
|
||||
ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex)
|
||||
ErrUnhealthy = Error(ErrGRPCUnhealthy)
|
||||
ErrCorrupt = Error(ErrGRPCCorrupt)
|
||||
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.5.2"
|
||||
Version = "3.5.4"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
@ -21,6 +21,17 @@ import (
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// CreateDefaultZapLogger creates a logger with default zap configuration
|
||||
func CreateDefaultZapLogger(level zapcore.Level) (*zap.Logger, error) {
|
||||
lcfg := DefaultZapLoggerConfig
|
||||
lcfg.Level = zap.NewAtomicLevelAt(level)
|
||||
c, err := lcfg.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// DefaultZapLoggerConfig defines default zap logger configuration.
|
||||
var DefaultZapLoggerConfig = zap.Config{
|
||||
Level: zap.NewAtomicLevelAt(ConvertToZapLevel(DefaultLogLevel)),
|
||||
|
@ -228,10 +228,10 @@ func TestSRVDiscover(t *testing.T) {
|
||||
[]*net.SRV{
|
||||
{Target: "a.example.com", Port: 2480},
|
||||
{Target: "b.example.com", Port: 2480},
|
||||
{Target: "c.example.com", Port: 2480},
|
||||
{Target: "c.example.com.", Port: 2480},
|
||||
},
|
||||
[]*net.SRV{},
|
||||
[]string{"https://a.example.com:2480", "https://b.example.com:2480", "https://c.example.com:2480"},
|
||||
[]string{"https://a.example.com:2480", "https://b.example.com:2480", "https://c.example.com.:2480"},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ go 1.16
|
||||
require (
|
||||
github.com/json-iterator/go v1.1.11
|
||||
github.com/modern-go/reflect2 v1.0.1
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
)
|
||||
|
||||
replace (
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/client/v3/credentials"
|
||||
"go.etcd.io/etcd/client/v3/internal/endpoint"
|
||||
"go.etcd.io/etcd/client/v3/internal/resolver"
|
||||
@ -184,7 +185,9 @@ func (c *Client) Sync(ctx context.Context) error {
|
||||
}
|
||||
var eps []string
|
||||
for _, m := range mresp.Members {
|
||||
eps = append(eps, m.ClientURLs...)
|
||||
if len(m.Name) != 0 && !m.IsLearner {
|
||||
eps = append(eps, m.ClientURLs...)
|
||||
}
|
||||
}
|
||||
c.SetEndpoints(eps...)
|
||||
return nil
|
||||
@ -368,7 +371,10 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
} else if cfg.LogConfig != nil {
|
||||
client.lg, err = cfg.LogConfig.Build()
|
||||
} else {
|
||||
client.lg, err = CreateDefaultZapLogger()
|
||||
client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
|
||||
if client.lg != nil {
|
||||
client.lg = client.lg.Named("etcd-client")
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -17,13 +17,14 @@ package clientv3
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
@ -198,3 +199,49 @@ func TestZapWithLogger(t *testing.T) {
|
||||
t.Errorf("WithZapLogger should modify *zap.Logger")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncFiltersMembers(t *testing.T) {
|
||||
c, _ := NewClient(t, Config{Endpoints: []string{"http://254.0.0.1:12345"}})
|
||||
defer c.Close()
|
||||
c.Cluster = &mockCluster{
|
||||
[]*etcdserverpb.Member{
|
||||
{ID: 0, Name: "", ClientURLs: []string{"http://254.0.0.1:12345"}, IsLearner: false},
|
||||
{ID: 1, Name: "isStarted", ClientURLs: []string{"http://254.0.0.2:12345"}, IsLearner: true},
|
||||
{ID: 2, Name: "isStartedAndNotLearner", ClientURLs: []string{"http://254.0.0.3:12345"}, IsLearner: false},
|
||||
},
|
||||
}
|
||||
c.Sync(context.Background())
|
||||
|
||||
endpoints := c.Endpoints()
|
||||
if len(endpoints) != 1 || endpoints[0] != "http://254.0.0.3:12345" {
|
||||
t.Error("Client.Sync uses learner and/or non-started member client URLs")
|
||||
}
|
||||
}
|
||||
|
||||
type mockCluster struct {
|
||||
members []*etcdserverpb.Member
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||
return &MemberListResponse{Members: mc.members}, nil
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -5,9 +5,9 @@ go 1.16
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
google.golang.org/grpc v1.38.0
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
|
@ -106,8 +106,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"os"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zapgrpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -29,10 +28,11 @@ func init() {
|
||||
// We override grpc logger only when the environment variable is set
|
||||
// in order to not interfere by default with user's code or other libraries.
|
||||
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
|
||||
lg, err := CreateDefaultZapLogger()
|
||||
lg, err := logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
lg = lg.Named("etcd-client")
|
||||
grpclog.SetLoggerV2(zapgrpc.NewLogger(lg))
|
||||
}
|
||||
}
|
||||
@ -57,21 +57,3 @@ func etcdClientDebugLevel() zapcore.Level {
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// CreateDefaultZapLoggerConfig creates a logger config that is configurable using env variable:
|
||||
// ETCD_CLIENT_DEBUG= debug|info|warn|error|dpanic|panic|fatal|true (true=info)
|
||||
func CreateDefaultZapLoggerConfig() zap.Config {
|
||||
lcfg := logutil.DefaultZapLoggerConfig
|
||||
lcfg.Level = zap.NewAtomicLevelAt(etcdClientDebugLevel())
|
||||
return lcfg
|
||||
}
|
||||
|
||||
// CreateDefaultZapLogger creates a logger that is configurable using env variable:
|
||||
// ETCD_CLIENT_DEBUG= debug|info|warn|error|dpanic|panic|fatal|true (true=info)
|
||||
func CreateDefaultZapLogger() (*zap.Logger, error) {
|
||||
c, err := CreateDefaultZapLoggerConfig().Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.Named("etcd-client"), nil
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ package mirror
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -52,7 +52,13 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
||||
|
||||
// if rev is not specified, we will choose the most recent revision.
|
||||
if s.rev == 0 {
|
||||
resp, err := s.c.Get(ctx, "foo")
|
||||
// If len(s.prefix) == 0, we will check a random key to fetch the most recent
|
||||
// revision (foo), otherwise we use the provided prefix.
|
||||
checkPath := "foo"
|
||||
if len(s.prefix) != 0 {
|
||||
checkPath = s.prefix
|
||||
}
|
||||
resp, err := s.c.Get(ctx, checkPath)
|
||||
if err != nil {
|
||||
errchan <- err
|
||||
close(respchan)
|
||||
|
@ -77,6 +77,9 @@ type Op struct {
|
||||
cmps []Cmp
|
||||
thenOps []Op
|
||||
elseOps []Op
|
||||
|
||||
isOptsWithFromKey bool
|
||||
isOptsWithPrefix bool
|
||||
}
|
||||
|
||||
// accessors / mutators
|
||||
@ -216,6 +219,10 @@ func (op Op) isWrite() bool {
|
||||
return op.t != tRange
|
||||
}
|
||||
|
||||
func NewOp() *Op {
|
||||
return &Op{key: []byte("")}
|
||||
}
|
||||
|
||||
// OpGet returns "get" operation based on given key and operation options.
|
||||
func OpGet(key string, opts ...OpOption) Op {
|
||||
// WithPrefix and WithFromKey are not supported together
|
||||
@ -387,6 +394,7 @@ func WithPrefix() OpOption {
|
||||
return
|
||||
}
|
||||
op.end = getPrefix(op.key)
|
||||
op.isOptsWithPrefix = true
|
||||
}
|
||||
}
|
||||
|
||||
@ -406,6 +414,7 @@ func WithFromKey() OpOption {
|
||||
op.key = []byte{0}
|
||||
}
|
||||
op.end = []byte("\x00")
|
||||
op.isOptsWithFromKey = true
|
||||
}
|
||||
}
|
||||
|
||||
@ -554,7 +563,21 @@ func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLi
|
||||
}
|
||||
|
||||
// IsOptsWithPrefix returns true if WithPrefix option is called in the given opts.
|
||||
func IsOptsWithPrefix(opts []OpOption) bool { return isOpFuncCalled("WithPrefix", opts) }
|
||||
func IsOptsWithPrefix(opts []OpOption) bool {
|
||||
ret := NewOp()
|
||||
for _, opt := range opts {
|
||||
opt(ret)
|
||||
}
|
||||
|
||||
return ret.isOptsWithPrefix
|
||||
}
|
||||
|
||||
// IsOptsWithFromKey returns true if WithFromKey option is called in the given opts.
|
||||
func IsOptsWithFromKey(opts []OpOption) bool { return isOpFuncCalled("WithFromKey", opts) }
|
||||
func IsOptsWithFromKey(opts []OpOption) bool {
|
||||
ret := NewOp()
|
||||
for _, opt := range opts {
|
||||
opt(ret)
|
||||
}
|
||||
|
||||
return ret.isOptsWithFromKey
|
||||
}
|
||||
|
@ -44,9 +44,6 @@ func hasChecksum(n int64) bool {
|
||||
// selected node, and saved snapshot is the point-in-time state of
|
||||
// the selected node.
|
||||
func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error {
|
||||
if lg == nil {
|
||||
lg = zap.NewExample()
|
||||
}
|
||||
cfg.Logger = lg.Named("client")
|
||||
if len(cfg.Endpoints) != 1 {
|
||||
return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
|
||||
|
@ -16,9 +16,6 @@ package clientv3
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -32,18 +29,3 @@ func jitterUp(duration time.Duration, jitter float64) time.Duration {
|
||||
multiplier := jitter * (rand.Float64()*2 - 1)
|
||||
return time.Duration(float64(duration) * (1 + multiplier))
|
||||
}
|
||||
|
||||
// Check if the provided function is being called in the op options.
|
||||
func isOpFuncCalled(op string, opts []OpOption) bool {
|
||||
for _, opt := range opts {
|
||||
v := reflect.ValueOf(opt)
|
||||
if v.Kind() == reflect.Func {
|
||||
if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil {
|
||||
if strings.Contains(opFunc.Name(), op) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
|
||||
if err != nil {
|
||||
cobrautl.ExitWithError(cobrautl.ExitError, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
failures := 0
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/pkg/v3/cobrautl"
|
||||
"go.etcd.io/etcd/pkg/v3/flags"
|
||||
@ -88,7 +89,7 @@ type epHealth struct {
|
||||
|
||||
// epHealthCommandFunc executes the "endpoint-health" command.
|
||||
func epHealthCommandFunc(cmd *cobra.Command, args []string) {
|
||||
lg, err := zap.NewProduction()
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
cobrautl.ExitWithError(cobrautl.ExitError, err)
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/bgentry/speakeasy"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/srv"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
@ -114,7 +115,7 @@ func (*discardValue) Set(string) error { return nil }
|
||||
func (*discardValue) Type() string { return "" }
|
||||
|
||||
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
|
||||
lg, err := zap.NewProduction()
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
cobrautl.ExitWithError(cobrautl.ExitError, err)
|
||||
}
|
||||
@ -193,7 +194,7 @@ func newClientCfg(endpoints []string, dialTimeout, keepAliveTime, keepAliveTimeo
|
||||
// set tls if any one tls option set
|
||||
var cfgtls *transport.TLSInfo
|
||||
tlsinfo := transport.TLSInfo{}
|
||||
tlsinfo.Logger, _ = zap.NewProduction()
|
||||
tlsinfo.Logger, _ = logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if scfg.cert != "" {
|
||||
tlsinfo.CertFile = scfg.cert
|
||||
cfgtls = &tlsinfo
|
||||
|
@ -67,7 +67,7 @@ func printMemberListWithHexJSON(r clientv3.MemberListResponse) {
|
||||
b = strconv.AppendUint(nil, r.Header.MemberId, 16)
|
||||
buffer.Write(b)
|
||||
buffer.WriteString("\",\"raft_term\":")
|
||||
b = strconv.AppendUint(nil, r.Header.RaftTerm, 16)
|
||||
b = strconv.AppendUint(nil, r.Header.RaftTerm, 10)
|
||||
buffer.Write(b)
|
||||
buffer.WriteByte('}')
|
||||
for i := 0; i < len(r.Members); i++ {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
snapshot "go.etcd.io/etcd/client/v3/snapshot"
|
||||
"go.etcd.io/etcd/etcdutl/v3/etcdutl"
|
||||
"go.etcd.io/etcd/pkg/v3/cobrautl"
|
||||
@ -98,7 +99,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
|
||||
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
|
||||
}
|
||||
|
||||
lg, err := zap.NewProduction()
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
cobrautl.ExitWithError(cobrautl.ExitError, err)
|
||||
}
|
||||
|
@ -9,12 +9,12 @@ require (
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/urfave/cli v1.22.4
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/v2 v2.305.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.2
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v2 v2.305.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.4
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.38.0
|
||||
|
@ -236,8 +236,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
@ -341,8 +341,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -387,8 +387,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -428,16 +429,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
|
@ -319,10 +319,10 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
|
||||
if !v3 {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
cindex.UnsafeCreateMetaBucket(tx)
|
||||
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
|
||||
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
|
||||
} else {
|
||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||
// 'empty', there is no need to update the consistency index.
|
||||
|
@ -15,13 +15,14 @@
|
||||
package etcdutl
|
||||
|
||||
import (
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/pkg/v3/cobrautl"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
func GetLogger() *zap.Logger {
|
||||
config := zap.NewProductionConfig()
|
||||
config := logutil.DefaultZapLoggerConfig
|
||||
config.Encoding = "console"
|
||||
config.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
|
||||
lg, err := config.Build()
|
||||
|
@ -25,11 +25,11 @@ require (
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/spf13/cobra v1.1.3
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.2
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||
go.etcd.io/etcd/server/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/raft/v3 v3.5.4
|
||||
go.etcd.io/etcd/server/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
)
|
||||
|
@ -230,8 +230,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
@ -331,8 +331,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -377,8 +377,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -418,16 +419,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
|
@ -69,9 +69,6 @@ type Manager interface {
|
||||
|
||||
// NewV3 returns a new snapshot Manager for v3.x snapshot.
|
||||
func NewV3(lg *zap.Logger) Manager {
|
||||
if lg == nil {
|
||||
lg = zap.NewExample()
|
||||
}
|
||||
return &v3Manager{lg: lg}
|
||||
}
|
||||
|
||||
@ -479,6 +476,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
|
||||
be := backend.NewDefaultBackend(s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
|
||||
return nil
|
||||
}
|
||||
|
20
go.mod
20
go.mod
@ -20,16 +20,16 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/spf13/cobra v1.1.3
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/v2 v2.305.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.2
|
||||
go.etcd.io/etcd/etcdctl/v3 v3.5.2
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||
go.etcd.io/etcd/server/v3 v3.5.2
|
||||
go.etcd.io/etcd/tests/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v2 v2.305.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.etcd.io/etcd/etcdctl/v3 v3.5.4
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.4
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/raft/v3 v3.5.4
|
||||
go.etcd.io/etcd/server/v3 v3.5.4
|
||||
go.etcd.io/etcd/tests/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.38.0
|
||||
|
18
go.sum
18
go.sum
@ -244,8 +244,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -400,8 +400,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -442,16 +443,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/creack/pty"
|
||||
)
|
||||
@ -36,7 +37,6 @@ type ExpectProcess struct {
|
||||
fpty *os.File
|
||||
wg sync.WaitGroup
|
||||
|
||||
cond *sync.Cond // for broadcasting updates are available
|
||||
mu sync.Mutex // protects lines and err
|
||||
lines []string
|
||||
count int // increment whenever new line gets added
|
||||
@ -60,7 +60,6 @@ func NewExpectWithEnv(name string, args []string, env []string) (ep *ExpectProce
|
||||
cmd: cmd,
|
||||
StopSignal: syscall.SIGKILL,
|
||||
}
|
||||
ep.cond = sync.NewCond(&ep.mu)
|
||||
ep.cmd.Stderr = ep.cmd.Stdout
|
||||
ep.cmd.Stdin = nil
|
||||
|
||||
@ -77,52 +76,56 @@ func (ep *ExpectProcess) read() {
|
||||
defer ep.wg.Done()
|
||||
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
|
||||
r := bufio.NewReader(ep.fpty)
|
||||
for ep.err == nil {
|
||||
l, rerr := r.ReadString('\n')
|
||||
for {
|
||||
l, err := r.ReadString('\n')
|
||||
ep.mu.Lock()
|
||||
ep.err = rerr
|
||||
if l != "" {
|
||||
if printDebugLines {
|
||||
fmt.Printf("%s-%d: %s", ep.cmd.Path, ep.cmd.Process.Pid, l)
|
||||
}
|
||||
ep.lines = append(ep.lines, l)
|
||||
ep.count++
|
||||
if len(ep.lines) == 1 {
|
||||
ep.cond.Signal()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
ep.err = err
|
||||
ep.mu.Unlock()
|
||||
break
|
||||
}
|
||||
ep.mu.Unlock()
|
||||
}
|
||||
ep.cond.Signal()
|
||||
}
|
||||
|
||||
// ExpectFunc returns the first line satisfying the function f.
|
||||
func (ep *ExpectProcess) ExpectFunc(f func(string) bool) (string, error) {
|
||||
lastLinesBuffer := make([]string, 0)
|
||||
i := 0
|
||||
|
||||
ep.mu.Lock()
|
||||
for {
|
||||
for len(ep.lines) == 0 && ep.err == nil {
|
||||
ep.cond.Wait()
|
||||
ep.mu.Lock()
|
||||
for i < len(ep.lines) {
|
||||
line := ep.lines[i]
|
||||
i++
|
||||
if f(line) {
|
||||
ep.mu.Unlock()
|
||||
return line, nil
|
||||
}
|
||||
}
|
||||
if len(ep.lines) == 0 {
|
||||
if ep.err != nil {
|
||||
ep.mu.Unlock()
|
||||
break
|
||||
}
|
||||
l := ep.lines[0]
|
||||
ep.lines = ep.lines[1:]
|
||||
lastLinesBuffer = append(lastLinesBuffer, l)
|
||||
if l := len(lastLinesBuffer); l > DEBUG_LINES_TAIL {
|
||||
lastLinesBuffer = lastLinesBuffer[l-DEBUG_LINES_TAIL : l-1]
|
||||
}
|
||||
if f(l) {
|
||||
ep.mu.Unlock()
|
||||
return l, nil
|
||||
}
|
||||
ep.mu.Unlock()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
ep.mu.Lock()
|
||||
lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL
|
||||
if lastLinesIndex < 0 {
|
||||
lastLinesIndex = 0
|
||||
}
|
||||
lastLines := strings.Join(ep.lines[lastLinesIndex:], "")
|
||||
ep.mu.Unlock()
|
||||
return "", fmt.Errorf("match not found."+
|
||||
" Set EXPECT_DEBUG for more info Err: %v, last lines:\n%s",
|
||||
ep.err, strings.Join(lastLinesBuffer, ""))
|
||||
ep.err, lastLines)
|
||||
}
|
||||
|
||||
// Expect returns the first line containing the given string.
|
||||
@ -189,3 +192,9 @@ func (ep *ExpectProcess) ProcessError() error {
|
||||
}
|
||||
return ep.err
|
||||
}
|
||||
|
||||
func (ep *ExpectProcess) Lines() []string {
|
||||
ep.mu.Lock()
|
||||
defer ep.mu.Unlock()
|
||||
return ep.lines
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ require (
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
google.golang.org/grpc v1.38.0
|
||||
)
|
||||
|
@ -190,12 +190,6 @@ func URLStringsEqual(ctx context.Context, lg *zap.Logger, a []string, b []string
|
||||
}
|
||||
urlsB = append(urlsB, *u)
|
||||
}
|
||||
if lg == nil {
|
||||
lg, _ = zap.NewProduction()
|
||||
if lg == nil {
|
||||
lg = zap.NewExample()
|
||||
}
|
||||
}
|
||||
return urlsEqual(ctx, lg, urlsA, urlsB)
|
||||
}
|
||||
|
||||
|
@ -37,17 +37,8 @@ var (
|
||||
defaultDialTimeout = 3 * time.Second
|
||||
defaultBufferSize = 48 * 1024
|
||||
defaultRetryInterval = 10 * time.Millisecond
|
||||
defaultLogger *zap.Logger
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
defaultLogger, err = zap.NewProduction()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Server defines proxy server layer that simulates common network faults:
|
||||
// latency spikes and packet drop or corruption. The proxy overhead is very
|
||||
// small overhead (<500μs per request). Please run tests to compute actual
|
||||
@ -240,9 +231,6 @@ func NewServer(cfg ServerConfig) Server {
|
||||
if s.retryInterval == 0 {
|
||||
s.retryInterval = defaultRetryInterval
|
||||
}
|
||||
if s.lg == nil {
|
||||
s.lg = defaultLogger
|
||||
}
|
||||
|
||||
close(s.pauseAcceptc)
|
||||
close(s.pauseTxc)
|
||||
|
@ -8,7 +8,7 @@ require (
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
)
|
||||
|
||||
// Bad imports are sometimes causing attempts to pull that code.
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
|
||||
func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions {
|
||||
user := getUser(lg, tx, userName)
|
||||
if user == nil {
|
||||
return nil
|
||||
@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
|
||||
return false
|
||||
}
|
||||
|
||||
func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
// assumption: tx is Lock()ed
|
||||
_, ok := as.rangePermCache[userName]
|
||||
if !ok {
|
||||
|
@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error {
|
||||
}
|
||||
b := as.be
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer func() {
|
||||
tx.Unlock()
|
||||
b.ForceCommit()
|
||||
@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() {
|
||||
}
|
||||
b := as.be
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
|
||||
as.commitRevision(tx)
|
||||
tx.Unlock()
|
||||
@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, username)
|
||||
@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
// CompareHashAndPassword is very expensive, so we use closures
|
||||
// to avoid putting it in the critical section of the tx lock.
|
||||
revision, err := func() (uint64, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
func (as *authStore) Recover(be backend.Backend) {
|
||||
enabled := false
|
||||
as.be = be
|
||||
tx := be.BatchTx()
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
|
||||
|
||||
func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
|
||||
|
||||
func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.User)
|
||||
@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
|
||||
|
||||
func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
tx.Unlock()
|
||||
|
||||
@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,
|
||||
|
||||
func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
users := getAllUsers(as.lg, tx)
|
||||
tx.Unlock()
|
||||
|
||||
@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
|
||||
|
||||
func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
var resp pb.AuthRoleGetResponse
|
||||
@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
|
||||
|
||||
func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
roles := getAllRoles(as.lg, tx)
|
||||
tx.Unlock()
|
||||
|
||||
@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon
|
||||
|
||||
func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Role)
|
||||
@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Role)
|
||||
@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Name)
|
||||
@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Name)
|
||||
@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
|
||||
return ErrAuthOldRevision
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
return ErrUserEmpty
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
u := getUser(as.lg, tx, authInfo.Username)
|
||||
tx.Unlock()
|
||||
@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
|
||||
func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
|
||||
return user
|
||||
}
|
||||
|
||||
func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
|
||||
func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) {
|
||||
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
|
||||
}
|
||||
|
||||
func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
|
||||
func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
|
||||
return role
|
||||
}
|
||||
|
||||
func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
|
||||
func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
|
||||
tx.UnsafeCreateBucket(buckets.Auth)
|
||||
tx.UnsafeCreateBucket(buckets.AuthUsers)
|
||||
@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
|
||||
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
|
||||
}
|
||||
|
||||
func getRevision(tx backend.BatchTx) uint64 {
|
||||
func getRevision(tx backend.ReadTx) uint64 {
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
// this can happen in the initialization phase
|
||||
@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {
|
||||
|
||||
func (as *authStore) HasRole(user, role string) bool {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
u := getUser(as.lg, tx, user)
|
||||
tx.Unlock()
|
||||
|
||||
|
@ -355,11 +355,10 @@ func (cfg *config) parse(arguments []string) error {
|
||||
|
||||
func (cfg *config) configFromCmdLine() error {
|
||||
// user-specified logger is not setup yet, use this logger during flag parsing
|
||||
lg, err := zap.NewProduction()
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
verKey := "ETCD_VERSION"
|
||||
if verVal := os.Getenv(verKey); verVal != "" {
|
||||
// unset to avoid any possible side-effect.
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil"
|
||||
@ -63,7 +64,7 @@ func startEtcdOrProxyV2(args []string) {
|
||||
if lg == nil {
|
||||
var zapError error
|
||||
// use this logger
|
||||
lg, zapError = zap.NewProduction()
|
||||
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if zapError != nil {
|
||||
fmt.Printf("error creating zap logger %v", zapError)
|
||||
os.Exit(1)
|
||||
@ -463,6 +464,10 @@ func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {
|
||||
}
|
||||
|
||||
func checkSupportArch() {
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// to add a new platform, check https://github.com/etcd-io/website/blob/main/content/en/docs/next/op-guide/supported-platform.md
|
||||
if runtime.GOARCH == "amd64" ||
|
||||
runtime.GOARCH == "arm64" ||
|
||||
@ -474,10 +479,10 @@ func checkSupportArch() {
|
||||
// so unset here to not parse through flag
|
||||
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
|
||||
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
|
||||
fmt.Printf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set\n", env)
|
||||
lg.Info("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", env))
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set\n", runtime.GOARCH)
|
||||
lg.Error("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", runtime.GOARCH))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/server/v3/proxy/tcpproxy"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@ -92,8 +93,7 @@ func stripSchema(eps []string) []string {
|
||||
}
|
||||
|
||||
func startGateway(cmd *cobra.Command, args []string) {
|
||||
var lg *zap.Logger
|
||||
lg, err := zap.NewProduction()
|
||||
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
|
@ -164,16 +164,14 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
|
||||
func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
checkArgs()
|
||||
|
||||
lcfg := logutil.DefaultZapLoggerConfig
|
||||
lvl := zap.InfoLevel
|
||||
if grpcProxyDebug {
|
||||
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
lvl = zap.DebugLevel
|
||||
grpc.EnableTracing = true
|
||||
}
|
||||
|
||||
lg, err := lcfg.Build()
|
||||
lg, err := logutil.CreateDefaultZapLogger(lvl)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
panic(err)
|
||||
}
|
||||
defer lg.Sync()
|
||||
|
||||
|
@ -41,9 +41,6 @@ func Main(args []string) {
|
||||
}
|
||||
|
||||
func notifySystemd(lg *zap.Logger) {
|
||||
if lg == nil {
|
||||
lg = zap.NewExample()
|
||||
}
|
||||
lg.Info("notifying init daemon")
|
||||
_, err := daemon.SdNotify(false, daemon.SdNotifyReady)
|
||||
if err != nil {
|
||||
|
@ -40,14 +40,16 @@ const (
|
||||
// HandleMetricsHealth registers metrics and health handlers.
|
||||
func HandleMetricsHealth(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) {
|
||||
mux.Handle(PathMetrics, promhttp.Handler())
|
||||
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet) Health { return checkV2Health(lg, srv, excludedAlarms) }))
|
||||
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { return checkV2Health(lg, srv, excludedAlarms) }))
|
||||
}
|
||||
|
||||
// HandleMetricsHealthForV3 registers metrics and health handlers. it checks health by using v3 range request
|
||||
// and its corresponding timeout.
|
||||
func HandleMetricsHealthForV3(lg *zap.Logger, mux *http.ServeMux, srv *etcdserver.EtcdServer) {
|
||||
mux.Handle(PathMetrics, promhttp.Handler())
|
||||
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet) Health { return checkV3Health(lg, srv, excludedAlarms) }))
|
||||
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health {
|
||||
return checkV3Health(lg, srv, excludedAlarms, serializable)
|
||||
}))
|
||||
}
|
||||
|
||||
// HandlePrometheus registers prometheus handler on '/metrics'.
|
||||
@ -56,7 +58,7 @@ func HandlePrometheus(mux *http.ServeMux) {
|
||||
}
|
||||
|
||||
// NewHealthHandler handles '/health' requests.
|
||||
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet) Health) http.HandlerFunc {
|
||||
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, serializable bool) Health) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
@ -65,7 +67,12 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet) Health
|
||||
return
|
||||
}
|
||||
excludedAlarms := getExcludedAlarms(r)
|
||||
h := hfunc(excludedAlarms)
|
||||
// Passing the query parameter "serializable=true" ensures that the
|
||||
// health of the local etcd is checked vs the health of the cluster.
|
||||
// This is useful for probes attempting to validate the liveness of
|
||||
// the etcd process vs readiness of the cluster to serve requests.
|
||||
serializableFlag := getSerializableFlag(r)
|
||||
h := hfunc(excludedAlarms, serializableFlag)
|
||||
defer func() {
|
||||
if h.Health == "true" {
|
||||
healthSuccess.Inc()
|
||||
@ -128,9 +135,13 @@ func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
|
||||
return alarms
|
||||
}
|
||||
|
||||
func getSerializableFlag(r *http.Request) bool {
|
||||
return r.URL.Query().Get("serializable") == "true"
|
||||
}
|
||||
|
||||
// TODO: etcdserver.ErrNoLeader in health API
|
||||
|
||||
func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet) Health {
|
||||
func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet, serializable bool) Health {
|
||||
h := Health{}
|
||||
h.Health = "true"
|
||||
as := srv.Alarms()
|
||||
@ -156,7 +167,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
|
||||
}
|
||||
}
|
||||
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
if !serializable && uint64(srv.Leader()) == raft.None {
|
||||
h.Health = "false"
|
||||
h.Reason = "RAFT NO LEADER"
|
||||
lg.Warn("serving /health false; no leader")
|
||||
@ -166,7 +177,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
|
||||
}
|
||||
|
||||
func checkV2Health(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet) (h Health) {
|
||||
if h = checkHealth(lg, srv, excludedAlarms); h.Health != "true" {
|
||||
if h = checkHealth(lg, srv, excludedAlarms, false); h.Health != "true" {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
@ -182,12 +193,12 @@ func checkV2Health(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms Alarm
|
||||
return
|
||||
}
|
||||
|
||||
func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms AlarmSet) (h Health) {
|
||||
if h = checkHealth(lg, srv, excludedAlarms); h.Health != "true" {
|
||||
func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms AlarmSet, serializable bool) (h Health) {
|
||||
if h = checkHealth(lg, srv, excludedAlarms, serializable); h.Health != "true" {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), srv.Cfg.ReqTimeout())
|
||||
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
|
||||
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
|
||||
cancel()
|
||||
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
|
||||
h.Health = "false"
|
||||
|
@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
if unsafeMemberExists(tx, mkey) {
|
||||
return errMemberAlreadyExist
|
||||
@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
|
||||
// from the v3 backend.
|
||||
func TrimClusterFromBackend(be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(buckets.Cluster)
|
||||
return nil
|
||||
@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
|
||||
mkey := backendMemberKey(id)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
|
||||
if !unsafeMemberExists(tx, mkey) {
|
||||
@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
|
||||
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
||||
lg.Info("Trimming membership information from the backend...")
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(buckets.Members, k)
|
||||
@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
ckey := backendClusterVersionKey()
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
|
||||
}
|
||||
@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
|
||||
lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
}
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
|
||||
}
|
||||
@ -316,7 +316,7 @@ func backendDowngradeKey() []byte {
|
||||
|
||||
func mustCreateBackendBuckets(be backend.Backend) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(buckets.Members)
|
||||
tx.UnsafeCreateBucket(buckets.MembersRemoved)
|
||||
|
@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
|
||||
}
|
||||
|
||||
b := a.bg.Backend()
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
|
||||
b.BatchTx().Unlock()
|
||||
|
||||
@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
|
||||
}
|
||||
|
||||
b := a.bg.Backend()
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
|
||||
b.BatchTx().Unlock()
|
||||
|
||||
@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error {
|
||||
b := a.bg.Backend()
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
tx.UnsafeCreateBucket(buckets.Alarm)
|
||||
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
|
||||
var m pb.AlarmMember
|
||||
|
@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{
|
||||
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
|
||||
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
|
||||
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
|
||||
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
|
||||
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
|
||||
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
|
||||
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
|
||||
|
@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
|
||||
consistentIndex := uint64(0)
|
||||
if beExist {
|
||||
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
|
||||
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
|
||||
}
|
||||
if snapshot.Metadata.Index <= consistentIndex {
|
||||
return oldbe, nil
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
type Backend interface {
|
||||
BatchTx() backend.BatchTx
|
||||
ReadTx() backend.ReadTx
|
||||
}
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
@ -33,9 +34,18 @@ type ConsistentIndexer interface {
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
|
||||
ConsistentApplyingIndex() (uint64, uint64)
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
|
||||
UnsafeConsistentIndex() uint64
|
||||
|
||||
// SetConsistentIndex set the consistent index of current executing entry.
|
||||
SetConsistentIndex(v uint64, term uint64)
|
||||
|
||||
// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
|
||||
SetConsistentApplyingIndex(v uint64, term uint64)
|
||||
|
||||
// UnsafeSave must be called holding the lock on the tx.
|
||||
// It saves consistentIndex to the underlying stable storage.
|
||||
UnsafeSave(tx backend.BatchTx)
|
||||
@ -55,6 +65,12 @@ type consistentIndex struct {
|
||||
// The value is being persisted in the backend since v3.5.
|
||||
term uint64
|
||||
|
||||
// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
|
||||
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
|
||||
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
|
||||
applyingIndex uint64
|
||||
applyingTerm uint64
|
||||
|
||||
// be is used for initial read consistentIndex
|
||||
be Backend
|
||||
// mutex is protecting be.
|
||||
@ -74,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
|
||||
v, term := ReadConsistentIndex(ci.be.BatchTx())
|
||||
v, term := ReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
|
||||
return index
|
||||
}
|
||||
|
||||
v, term := unsafeReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
@ -87,7 +113,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
term := atomic.LoadUint64(&ci.term)
|
||||
UnsafeUpdateConsistentIndex(tx, index, term, true)
|
||||
UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
@ -98,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
ci.SetConsistentIndex(0, 0)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
|
||||
atomic.StoreUint64(&ci.applyingIndex, v)
|
||||
atomic.StoreUint64(&ci.applyingTerm, term)
|
||||
}
|
||||
|
||||
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
|
||||
return &fakeConsistentIndex{index: index}
|
||||
}
|
||||
@ -107,12 +142,24 @@ type fakeConsistentIndex struct {
|
||||
term uint64
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
@ -124,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
func CreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
}
|
||||
@ -154,22 +201,12 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
return unsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't loaded the real index yet.
|
||||
return
|
||||
}
|
||||
|
||||
if onlyGrow {
|
||||
oldi, oldTerm := unsafeReadConsistentIndex(tx)
|
||||
if term < oldTerm {
|
||||
return
|
||||
}
|
||||
if term == oldTerm && index <= oldi {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
// put the index into the underlying backend
|
||||
@ -182,8 +219,8 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64,
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
tx.Lock()
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
|
||||
UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
@ -63,6 +63,58 @@ func TestConsistentIndex(t *testing.T) {
|
||||
assert.Equal(t, r, index)
|
||||
}
|
||||
|
||||
func TestConsistentIndexDecrease(t *testing.T) {
|
||||
initIndex := uint64(100)
|
||||
initTerm := uint64(10)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
index uint64
|
||||
term uint64
|
||||
}{
|
||||
{
|
||||
name: "Decrease term",
|
||||
index: initIndex + 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI",
|
||||
index: initIndex - 1,
|
||||
term: initTerm + 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI and term",
|
||||
index: initIndex - 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
UnsafeCreateMetaBucket(tx)
|
||||
UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
be = backend.NewDefaultBackend(tmpPath)
|
||||
defer be.Close()
|
||||
ci := NewConsistentIndex(be)
|
||||
ci.SetConsistentIndex(tc.index, tc.term)
|
||||
tx = be.BatchTx()
|
||||
tx.Lock()
|
||||
ci.UnsafeSave(tx)
|
||||
tx.Unlock()
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
|
||||
ci = NewConsistentIndex(be)
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFakeConsistentIndex(t *testing.T) {
|
||||
|
||||
r := rand.Uint64()
|
||||
|
@ -27,6 +27,7 @@ var (
|
||||
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
|
||||
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
|
||||
ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long")
|
||||
ErrLeaderChanged = errors.New("etcdserver: leader changed")
|
||||
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
|
||||
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
|
||||
|
@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// Set the hook after EtcdServer finishes the initialization to avoid
|
||||
// the hook being called during the initialization process.
|
||||
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := &rafthttp.Transport{
|
||||
Logger: cfg.Logger,
|
||||
@ -1243,6 +1247,13 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
lg.Panic("failed to open snapshot backend", zap.Error(err))
|
||||
}
|
||||
|
||||
// We need to set the backend to consistIndex before recovering the lessor,
|
||||
// because lessor.Recover will commit the boltDB transaction, accordingly it
|
||||
// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
|
||||
// Eventually the new consistent_index value coming from snapshot is overwritten
|
||||
// by the old value.
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
if s.lessor != nil {
|
||||
@ -1259,7 +1270,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
lg.Panic("failed to restore mvcc store", zap.Error(err))
|
||||
}
|
||||
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
|
||||
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
|
||||
|
||||
// Closing old backend might block until all the txns
|
||||
@ -2128,7 +2139,7 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
@ -2155,11 +2166,20 @@ func (s *EtcdServer) apply(
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
applyV3Performed := false
|
||||
var ar *applyResult
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
defer func() {
|
||||
// The txPostLockInsideApplyHook will not get called in some cases,
|
||||
// in which we should move the consistent index forward directly.
|
||||
if !applyV3Performed || (ar != nil && ar.err != nil) {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
}
|
||||
}()
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
zap.Uint64("consistent-index", index),
|
||||
@ -2201,12 +2221,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
id = raftReq.Header.ID
|
||||
}
|
||||
|
||||
var ar *applyResult
|
||||
needResult := s.w.IsRegistered(id)
|
||||
if needResult || !noSideEffect(&raftReq) {
|
||||
if !needResult && raftReq.Txn != nil {
|
||||
removeNeedlessRangeReqs(raftReq.Txn)
|
||||
}
|
||||
applyV3Performed = true
|
||||
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
|
||||
}
|
||||
|
||||
@ -2258,6 +2278,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.r.ApplyConfChange(cc)
|
||||
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
|
||||
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -2683,6 +2710,15 @@ func (s *EtcdServer) raftStatus() raft.Status {
|
||||
return s.r.Node.Status()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
|
||||
return func() {
|
||||
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
||||
size := be.Size()
|
||||
sizeInUse := be.SizeInUse()
|
||||
|
@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
|
||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
|
||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||
if consistIndex != appliedi {
|
||||
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
|
||||
}
|
||||
assert.Equal(t, uint64(2), appliedi)
|
||||
|
||||
t.Run("verify-backend", func(t *testing.T) {
|
||||
tx := be.BatchTx()
|
||||
@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv.beHooks.OnPreCommitUnsafe(tx)
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
|
||||
rindex, _ := cindex.ReadConsistentIndex(be.ReadTx())
|
||||
assert.Equal(t, consistIndex, rindex)
|
||||
assert.Equal(t, uint64(4), rterm)
|
||||
}
|
||||
|
||||
func realisticRaftNode(lg *zap.Logger) *raftNode {
|
||||
@ -2039,3 +2036,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
|
||||
}
|
||||
s.sendC <- send
|
||||
}
|
||||
|
||||
func TestWaitAppliedIndex(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
appliedIndex uint64
|
||||
committedIndex uint64
|
||||
action func(s *EtcdServer)
|
||||
ExpectedError error
|
||||
}{
|
||||
{
|
||||
name: "The applied Id is already equal to the commitId",
|
||||
appliedIndex: 10,
|
||||
committedIndex: 10,
|
||||
action: func(s *EtcdServer) {
|
||||
s.applyWait.Trigger(10)
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "The etcd server has already stopped",
|
||||
appliedIndex: 10,
|
||||
committedIndex: 12,
|
||||
action: func(s *EtcdServer) {
|
||||
s.stopping <- struct{}{}
|
||||
},
|
||||
ExpectedError: ErrStopped,
|
||||
},
|
||||
{
|
||||
name: "Timed out waiting for the applied index",
|
||||
appliedIndex: 10,
|
||||
committedIndex: 12,
|
||||
action: nil,
|
||||
ExpectedError: ErrTimeoutWaitAppliedIndex,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
appliedIndex: tc.appliedIndex,
|
||||
committedIndex: tc.committedIndex,
|
||||
stopping: make(chan struct{}, 1),
|
||||
applyWait: wait.NewTimeList(),
|
||||
}
|
||||
|
||||
if tc.action != nil {
|
||||
go tc.action(s)
|
||||
}
|
||||
|
||||
err := s.waitAppliedIndex()
|
||||
|
||||
if err != tc.ExpectedError {
|
||||
t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,10 @@ const (
|
||||
maxGapBetweenApplyAndCommitIndex = 5000
|
||||
traceThreshold = 100 * time.Millisecond
|
||||
readIndexRetryTime = 500 * time.Millisecond
|
||||
|
||||
// The timeout for the node to catch up its applied index, and is used in
|
||||
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
|
||||
applyTimeout = time.Second
|
||||
)
|
||||
|
||||
type RaftKV interface {
|
||||
@ -271,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
|
||||
return resp.(*pb.LeaseGrantResponse), nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) waitAppliedIndex() error {
|
||||
select {
|
||||
case <-s.ApplyWait():
|
||||
case <-s.stopping:
|
||||
return ErrStopped
|
||||
case <-time.After(applyTimeout):
|
||||
return ErrTimeoutWaitAppliedIndex
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
||||
if err != nil {
|
||||
@ -280,26 +296,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
}
|
||||
if err != lease.ErrNotPrimary {
|
||||
return -1, err
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
}
|
||||
if err != lease.ErrNotPrimary {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
// renewals don't go through raft; forward to leader manually
|
||||
for cctx.Err() == nil && err != nil {
|
||||
for cctx.Err() == nil {
|
||||
leader, lerr := s.waitLeader(cctx)
|
||||
if lerr != nil {
|
||||
return -1, lerr
|
||||
}
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeasePrefix
|
||||
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
|
||||
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
|
||||
if err == nil || err == lease.ErrLeaseNotFound {
|
||||
return ttl, err
|
||||
}
|
||||
@ -315,7 +337,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
if s.Leader() == s.ID() {
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// primary; timetolive directly from leader
|
||||
le := s.lessor.Lookup(lease.LeaseID(r.ID))
|
||||
if le == nil {
|
||||
|
@ -16,7 +16,7 @@ require (
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||
github.com/jonboulle/clockwork v0.2.2
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
github.com/prometheus/client_model v0.2.0
|
||||
github.com/sirupsen/logrus v1.7.0 // indirect
|
||||
github.com/soheilhy/cmux v0.1.5
|
||||
@ -25,20 +25,20 @@ require (
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/v2 v2.305.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.2
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v2 v2.305.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/raft/v3 v3.5.4
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
|
||||
go.opentelemetry.io/otel v0.20.0
|
||||
go.opentelemetry.io/otel/exporters/otlp v0.20.0
|
||||
go.opentelemetry.io/otel/sdk v0.20.0
|
||||
go.uber.org/multierr v1.6.0
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
|
||||
google.golang.org/grpc v1.38.0
|
||||
|
@ -234,8 +234,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
@ -343,8 +343,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -389,8 +389,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -430,16 +431,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -796,18 +797,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
||||
|
||||
func (le *lessor) initAndRecover() {
|
||||
tx := le.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
|
||||
tx.UnsafeCreateBucket(buckets.Lease)
|
||||
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
|
||||
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
|
||||
for i := range vs {
|
||||
var lpb leasepb.Lease
|
||||
err := lpb.Unmarshal(vs[i])
|
||||
if err != nil {
|
||||
tx.Unlock()
|
||||
panic("failed to unmarshal lease proto item")
|
||||
}
|
||||
lpbs := unsafeGetAllLeases(tx)
|
||||
tx.Unlock()
|
||||
for _, lpb := range lpbs {
|
||||
ID := LeaseID(lpb.ID)
|
||||
if lpb.TTL < le.minLeaseTTL {
|
||||
lpb.TTL = le.minLeaseTTL
|
||||
@ -825,7 +820,6 @@ func (le *lessor) initAndRecover() {
|
||||
}
|
||||
le.leaseExpiredNotifier.Init()
|
||||
heap.Init(&le.leaseCheckpointHeap)
|
||||
tx.Unlock()
|
||||
|
||||
le.b.ForceCommit()
|
||||
}
|
||||
@ -858,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) {
|
||||
panic("failed to marshal lease proto item")
|
||||
}
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafePut(buckets.Lease, key, val)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
@ -923,6 +917,30 @@ func int64ToBytes(n int64) []byte {
|
||||
return bytes
|
||||
}
|
||||
|
||||
func bytesToLeaseID(bytes []byte) int64 {
|
||||
if len(bytes) != 8 {
|
||||
panic(fmt.Errorf("lease ID must be 8-byte"))
|
||||
}
|
||||
return int64(binary.BigEndian.Uint64(bytes))
|
||||
}
|
||||
|
||||
func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
|
||||
ls := make([]*leasepb.Lease, 0)
|
||||
err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error {
|
||||
var lpb leasepb.Lease
|
||||
err := lpb.Unmarshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
|
||||
}
|
||||
ls = append(ls, &lpb)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ls
|
||||
}
|
||||
|
||||
// FakeLessor is a fake implementation of Lessor interface.
|
||||
// Used for testing only.
|
||||
type FakeLessor struct{}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
@ -27,9 +28,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/stretchr/testify/assert"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -649,6 +653,93 @@ func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseBackend(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx backend.BatchTx)
|
||||
want []*leasepb.Lease
|
||||
}{
|
||||
{
|
||||
name: "Empty by default",
|
||||
setup: func(tx backend.BatchTx) {},
|
||||
want: []*leasepb.Lease{},
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
mustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
})
|
||||
},
|
||||
want: []*leasepb.Lease{
|
||||
{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Skips deleted",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
mustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
})
|
||||
mustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: math.MinInt64,
|
||||
TTL: 2,
|
||||
})
|
||||
mustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: math.MaxInt64,
|
||||
TTL: 3,
|
||||
})
|
||||
tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1))
|
||||
},
|
||||
want: []*leasepb.Lease{
|
||||
{
|
||||
ID: math.MaxInt64,
|
||||
TTL: 3,
|
||||
},
|
||||
{
|
||||
ID: math.MinInt64, // bytes bigger than MaxInt64
|
||||
TTL: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(buckets.Lease)
|
||||
tc.setup(tx)
|
||||
tx.Unlock()
|
||||
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
be2 := backend.NewDefaultBackend(tmpPath)
|
||||
defer be2.Close()
|
||||
leases := unsafeGetAllLeases(be2.ReadTx())
|
||||
|
||||
assert.Equal(t, tc.want, leases)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||
key := int64ToBytes(lpb.ID)
|
||||
|
||||
val, err := lpb.Marshal()
|
||||
if err != nil {
|
||||
panic("failed to marshal lease proto item")
|
||||
}
|
||||
tx.UnsafePut(buckets.Lease, key, val)
|
||||
}
|
||||
|
||||
type fakeDeleter struct {
|
||||
deleted []string
|
||||
tx backend.BatchTx
|
||||
|
@ -68,6 +68,9 @@ type Backend interface {
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
||||
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
|
||||
SetTxPostLockInsideApplyHook(func())
|
||||
}
|
||||
|
||||
type Snapshot interface {
|
||||
@ -100,8 +103,9 @@ type backend struct {
|
||||
// mlock prevents backend database file to be swapped
|
||||
mlock bool
|
||||
|
||||
mu sync.RWMutex
|
||||
db *bolt.DB
|
||||
mu sync.RWMutex
|
||||
bopts *bolt.Options
|
||||
db *bolt.DB
|
||||
|
||||
batchInterval time.Duration
|
||||
batchLimit int
|
||||
@ -119,6 +123,9 @@ type backend struct {
|
||||
|
||||
hooks Hooks
|
||||
|
||||
// txPostLockInsideApplyHook is called each time right after locking the tx.
|
||||
txPostLockInsideApplyHook func()
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
@ -185,7 +192,8 @@ func newBackend(bcfg BackendConfig) *backend {
|
||||
// In future, may want to make buffering optional for low-concurrency systems
|
||||
// or dynamically swap between buffered/non-buffered depending on workload.
|
||||
b := &backend{
|
||||
db: db,
|
||||
bopts: bopts,
|
||||
db: db,
|
||||
|
||||
batchInterval: bcfg.BatchInterval,
|
||||
batchLimit: bcfg.BatchLimit,
|
||||
@ -229,6 +237,14 @@ func (b *backend) BatchTx() BatchTx {
|
||||
return b.batchTx
|
||||
}
|
||||
|
||||
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
|
||||
// It needs to lock the batchTx, because the periodic commit
|
||||
// may be accessing the txPostLockInsideApplyHook at the moment.
|
||||
b.batchTx.lock()
|
||||
defer b.batchTx.Unlock()
|
||||
b.txPostLockInsideApplyHook = hook
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ConcurrentReadTx creates and returns a new ReadTx, which:
|
||||
@ -438,7 +454,7 @@ func (b *backend) defrag() error {
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
b.batchTx.Lock()
|
||||
b.batchTx.LockOutsideApply()
|
||||
defer b.batchTx.Unlock()
|
||||
|
||||
// lock database after lock tx to avoid deadlock.
|
||||
@ -511,13 +527,7 @@ func (b *backend) defrag() error {
|
||||
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
|
||||
}
|
||||
|
||||
defragmentedBoltOptions := bolt.Options{}
|
||||
if boltOpenOptions != nil {
|
||||
defragmentedBoltOptions = *boltOpenOptions
|
||||
}
|
||||
defragmentedBoltOptions.Mlock = b.mlock
|
||||
|
||||
b.db, err = bolt.Open(dbp, 0600, &defragmentedBoltOptions)
|
||||
b.db, err = bolt.Open(dbp, 0600, b.bopts)
|
||||
if err != nil {
|
||||
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
|
||||
}
|
||||
|
@ -122,7 +122,17 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBackendDefrag(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
bcfg := backend.DefaultBackendConfig()
|
||||
// Make sure we change BackendFreelistType
|
||||
// The goal is to verify that we restore config option after defrag.
|
||||
if bcfg.BackendFreelistType == bolt.FreelistMapType {
|
||||
bcfg.BackendFreelistType = bolt.FreelistArrayType
|
||||
} else {
|
||||
bcfg.BackendFreelistType = bolt.FreelistMapType
|
||||
}
|
||||
|
||||
b, _ := betesting.NewTmpBackendFromCfg(t, bcfg)
|
||||
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
tx := b.BatchTx()
|
||||
@ -168,6 +178,10 @@ func TestBackendDefrag(t *testing.T) {
|
||||
if nsize >= size {
|
||||
t.Errorf("new size = %v, want < %d", nsize, size)
|
||||
}
|
||||
db := backend.DbFromBackendForTest(b)
|
||||
if db.FreelistType != bcfg.BackendFreelistType {
|
||||
t.Errorf("db FreelistType = [%v], want [%v]", db.FreelistType, bcfg.BackendFreelistType)
|
||||
}
|
||||
|
||||
// try put more keys after shrink.
|
||||
tx = b.BatchTx()
|
||||
|
@ -53,6 +53,8 @@ type BatchTx interface {
|
||||
Commit()
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
CommitAndStop()
|
||||
LockInsideApply()
|
||||
LockOutsideApply()
|
||||
}
|
||||
|
||||
type batchTx struct {
|
||||
@ -63,10 +65,34 @@ type batchTx struct {
|
||||
pending int
|
||||
}
|
||||
|
||||
// Lock is supposed to be called only by the unit test.
|
||||
func (t *batchTx) Lock() {
|
||||
ValidateCalledInsideUnittest(t.backend.lg)
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) lock() {
|
||||
t.Mutex.Lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) LockInsideApply() {
|
||||
t.lock()
|
||||
if t.backend.txPostLockInsideApplyHook != nil {
|
||||
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
||||
// can be coming from both InsideApply and OutsideApply, but the
|
||||
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
||||
// So we should check the txPostLockInsideApplyHook before validating
|
||||
// the callstack.
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.backend.txPostLockInsideApplyHook()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *batchTx) LockOutsideApply() {
|
||||
ValidateCalledOutSideApply(t.backend.lg)
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
t.commit(false)
|
||||
@ -214,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
|
||||
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
@ -291,13 +317,13 @@ func (t *batchTxBuffered) Unlock() {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) {
|
||||
// Empty commit.
|
||||
tx.Commit()
|
||||
|
||||
write(tx, []byte("foo"), []byte("bar"))
|
||||
|
||||
assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
|
||||
tx.Commit()
|
||||
assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")
|
||||
|
70
server/mvcc/backend/verify.go
Normal file
70
server/mvcc/backend/verify.go
Normal file
@ -0,0 +1,70 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package backend
|
||||
|
||||
import (
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
ENV_VERIFY = "ETCD_VERIFY"
|
||||
ENV_VERIFY_ALL_VALUE = "all"
|
||||
ENV_VERIFY_LOCK = "lock"
|
||||
)
|
||||
|
||||
func ValidateCalledInsideApply(lg *zap.Logger) {
|
||||
if !verifyLockEnabled() {
|
||||
return
|
||||
}
|
||||
if !insideApply() {
|
||||
lg.Panic("Called outside of APPLY!", zap.Stack("stacktrace"))
|
||||
}
|
||||
}
|
||||
|
||||
func ValidateCalledOutSideApply(lg *zap.Logger) {
|
||||
if !verifyLockEnabled() {
|
||||
return
|
||||
}
|
||||
if insideApply() {
|
||||
lg.Panic("Called inside of APPLY!", zap.Stack("stacktrace"))
|
||||
}
|
||||
}
|
||||
|
||||
func ValidateCalledInsideUnittest(lg *zap.Logger) {
|
||||
if !verifyLockEnabled() {
|
||||
return
|
||||
}
|
||||
if !insideUnittest() {
|
||||
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
|
||||
}
|
||||
}
|
||||
|
||||
func verifyLockEnabled() bool {
|
||||
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
|
||||
}
|
||||
|
||||
func insideApply() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, ".applyEntries")
|
||||
}
|
||||
|
||||
func insideUnittest() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
|
||||
}
|
111
server/mvcc/backend/verify_test.go
Normal file
111
server/mvcc/backend/verify_test.go
Normal file
@ -0,0 +1,111 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package backend_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
)
|
||||
|
||||
func TestLockVerify(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
txPostLockInsideApplyHook func()
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "call lockInsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
txPostLockInsideApplyHook: func() {},
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from outside apply",
|
||||
insideApply: false,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call Lock from unit test",
|
||||
insideApply: false,
|
||||
lock: lockFromUT,
|
||||
expectPanic: false,
|
||||
},
|
||||
}
|
||||
env := os.Getenv("ETCD_VERIFY")
|
||||
os.Setenv("ETCD_VERIFY", "lock")
|
||||
defer func() {
|
||||
os.Setenv("ETCD_VERIFY", env)
|
||||
}()
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook)
|
||||
|
||||
hasPaniced := handlePanic(func() {
|
||||
if tc.insideApply {
|
||||
applyEntries(be, tc.lock)
|
||||
} else {
|
||||
tc.lock(be.BatchTx())
|
||||
}
|
||||
}) != nil
|
||||
if hasPaniced != tc.expectPanic {
|
||||
t.Errorf("%v != %v", hasPaniced, tc.expectPanic)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func handlePanic(f func()) (result interface{}) {
|
||||
defer func() {
|
||||
result = recover()
|
||||
}()
|
||||
f()
|
||||
return result
|
||||
}
|
||||
|
||||
func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) {
|
||||
f(be.BatchTx())
|
||||
}
|
||||
|
||||
func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() }
|
||||
func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() }
|
||||
func lockFromUT(tx backend.BatchTx) { tx.Lock() }
|
@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
tx.UnsafeCreateBucket(buckets.Key)
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
tx.Unlock()
|
||||
@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
||||
revToBytes(revision{main: rev}, rbytes)
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
// ensure that desired compaction is persisted
|
||||
@ -334,7 +334,7 @@ func (s *store) restore() error {
|
||||
keyToLease := make(map[string]lease.LeaseID)
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
|
@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
start := time.Now()
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
|
@ -871,6 +871,8 @@ type fakeBatchTx struct {
|
||||
rangeRespc chan rangeResp
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) LockInsideApply() {}
|
||||
func (b *fakeBatchTx) LockOutsideApply() {}
|
||||
func (b *fakeBatchTx) Lock() {}
|
||||
func (b *fakeBatchTx) Unlock() {}
|
||||
func (b *fakeBatchTx) RLock() {}
|
||||
@ -912,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
func (b *fakeBackend) Close() error { return nil }
|
||||
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
|
||||
|
||||
type indexGetResp struct {
|
||||
rev revision
|
||||
|
@ -78,7 +78,7 @@ type storeTxnWrite struct {
|
||||
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
||||
s.mu.RLock()
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tw := &storeTxnWrite{
|
||||
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
|
||||
tx: tx,
|
||||
|
@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
||||
panic(fmt.Errorf("cannot marshal event: %v", err))
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
be.BatchTx().LockOutsideApply()
|
||||
be.BatchTx().UnsafePut(buckets.Key, ibytes, d)
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet) etcdhttp.Health { return checkHealth(c) }))
|
||||
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkHealth(c) }))
|
||||
}
|
||||
|
||||
// HandleProxyHealth registers health handler on '/proxy/health'.
|
||||
@ -39,7 +39,7 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet) etcdhttp.Health { return checkProxyHealth(c) }))
|
||||
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) }))
|
||||
}
|
||||
|
||||
func checkHealth(c *clientv3.Client) etcdhttp.Health {
|
||||
|
@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) {
|
||||
}
|
||||
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
index, term := cindex.ReadConsistentIndex(tx)
|
||||
index, term := cindex.ReadConsistentIndex(be.ReadTx())
|
||||
if cfg.ExactIndex && index != hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
|
1
test.sh
1
test.sh
@ -43,6 +43,7 @@ set -o pipefail
|
||||
# e.g. add/update missing dependencies. Such divergences should be
|
||||
# detected and trigger a failure that needs explicit developer's action.
|
||||
export GOFLAGS=-mod=readonly
|
||||
export ETCD_VERIFY=all
|
||||
|
||||
source ./scripts/test_lib.sh
|
||||
source ./build.sh
|
||||
|
@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
|
||||
v2deprecation string
|
||||
|
||||
rollingStart bool
|
||||
logLevel string
|
||||
}
|
||||
|
||||
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
@ -315,6 +316,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
|
||||
args = append(args, "--v2-deprecation", cfg.v2deprecation)
|
||||
}
|
||||
|
||||
if cfg.logLevel != "" {
|
||||
args = append(args, "--log-level", cfg.logLevel)
|
||||
}
|
||||
|
||||
etcdCfgs[i] = &etcdServerProcessConfig{
|
||||
lg: lg,
|
||||
execPath: cfg.execPath,
|
||||
|
@ -155,15 +155,20 @@ func memberListWithHexTest(cx ctlCtx) {
|
||||
if num == 0 {
|
||||
cx.t.Fatal("member number is 0")
|
||||
}
|
||||
|
||||
if resp.Header.RaftTerm != hexResp.Header.RaftTerm {
|
||||
cx.t.Fatalf("Unexpected raft_term, expected %d, got %d", resp.Header.RaftTerm, hexResp.Header.RaftTerm)
|
||||
}
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
if resp.Members[i].Name != hexResp.Members[i].Name {
|
||||
cx.t.Fatalf("member name,expected %v,got %v", resp.Members[i].Name, hexResp.Members[i].Name)
|
||||
cx.t.Fatalf("Unexpected member name,expected %v, got %v", resp.Members[i].Name, hexResp.Members[i].Name)
|
||||
}
|
||||
if !reflect.DeepEqual(resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs) {
|
||||
cx.t.Fatalf("member peerURLs,expected %v,got %v", resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs)
|
||||
cx.t.Fatalf("Unexpected member peerURLs, expected %v, got %v", resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs)
|
||||
}
|
||||
if !reflect.DeepEqual(resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs) {
|
||||
cx.t.Fatalf("member clientURLS,expected %v,got %v", resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs)
|
||||
cx.t.Fatalf("Unexpected member clientURLS, expected %v, got %v", resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,8 @@ type etcdProcess interface {
|
||||
|
||||
type logsExpect interface {
|
||||
Expect(string) (string, error)
|
||||
Lines() []string
|
||||
LineCount() int
|
||||
}
|
||||
|
||||
type etcdServerProcess struct {
|
||||
|
@ -60,21 +60,15 @@ func spawnWithExpectLines(args []string, envVars map[string]string, xs ...string
|
||||
// process until either stdout or stderr contains
|
||||
// the expected string
|
||||
var (
|
||||
lines []string
|
||||
lineFunc = func(txt string) bool { return true }
|
||||
lines []string
|
||||
)
|
||||
for _, txt := range xs {
|
||||
for {
|
||||
l, lerr := proc.ExpectFunc(lineFunc)
|
||||
if lerr != nil {
|
||||
proc.Close()
|
||||
return nil, fmt.Errorf("%v %v (expected %q, got %q). Try EXPECT_DEBUG=TRUE", args, lerr, txt, lines)
|
||||
}
|
||||
lines = append(lines, l)
|
||||
if strings.Contains(l, txt) {
|
||||
break
|
||||
}
|
||||
l, lerr := proc.Expect(txt)
|
||||
if lerr != nil {
|
||||
proc.Close()
|
||||
return nil, fmt.Errorf("%v %v (expected %q, got %q). Try EXPECT_DEBUG=TRUE", args, lerr, txt, lines)
|
||||
}
|
||||
lines = append(lines, l)
|
||||
}
|
||||
perr := proc.Close()
|
||||
l := proc.LineCount()
|
||||
|
76
tests/e2e/zap_logging_test.go
Normal file
76
tests/e2e/zap_logging_test.go
Normal file
@ -0,0 +1,76 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cov
|
||||
// +build !cov
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestServerJsonLogging(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
|
||||
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
|
||||
clusterSize: 1,
|
||||
initialToken: "new",
|
||||
logLevel: "debug",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
logs := epc.procs[0].Logs()
|
||||
time.Sleep(time.Second)
|
||||
if err = epc.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", err)
|
||||
}
|
||||
var entry logEntry
|
||||
lines := logs.Lines()
|
||||
if len(lines) == 0 {
|
||||
t.Errorf("Expected at least one log line")
|
||||
}
|
||||
for _, line := range lines {
|
||||
err := json.Unmarshal([]byte(line), &entry)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to parse log line as json, err: %q, line: %s", err, line)
|
||||
continue
|
||||
}
|
||||
if entry.Level == "" {
|
||||
t.Errorf(`Missing "level" key, line: %s`, line)
|
||||
}
|
||||
if entry.Timestamp == "" {
|
||||
t.Errorf(`Missing "ts" key, line: %s`, line)
|
||||
}
|
||||
if _, err := time.Parse("2006-01-02T15:04:05.000Z0700", entry.Timestamp); entry.Timestamp != "" && err != nil {
|
||||
t.Errorf(`Unexpected "ts" key format, err: %s`, err)
|
||||
}
|
||||
if entry.Caller == "" {
|
||||
t.Errorf(`Missing "caller" key, line: %s`, line)
|
||||
}
|
||||
if entry.Message == "" {
|
||||
t.Errorf(`Missing "message" key, line: %s`, line)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type logEntry struct {
|
||||
Level string `json:"level"`
|
||||
Timestamp string `json:"ts"`
|
||||
Caller string `json:"caller"`
|
||||
Message string `json:"msg"`
|
||||
}
|
@ -18,6 +18,7 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/tests/v3/functional/agent"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -27,7 +28,7 @@ var logger *zap.Logger
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
logger, err = zap.NewProduction()
|
||||
logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/pkg/v3/proxy"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -76,7 +77,11 @@ $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
|
||||
To: url.URL{Scheme: "tcp", Host: to},
|
||||
}
|
||||
if verbose {
|
||||
cfg.Logger = zap.NewExample()
|
||||
var err error
|
||||
cfg.Logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
p := proxy.NewServer(cfg)
|
||||
<-p.Ready()
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"flag"
|
||||
|
||||
_ "github.com/etcd-io/gofail/runtime"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/tests/v3/functional/tester"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -27,7 +28,7 @@ var logger *zap.Logger
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
logger, err = zap.NewProduction()
|
||||
logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/tests/v3/functional/rpcpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -256,7 +257,7 @@ func Test_read(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
logger, err := zap.NewProduction()
|
||||
logger, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
20
tests/go.mod
20
tests/go.mod
@ -22,22 +22,22 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
github.com/soheilhy/cmux v0.1.5
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/client/v2 v2.305.2
|
||||
go.etcd.io/etcd/client/v3 v3.5.2
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||
go.etcd.io/etcd/server/v3 v3.5.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v2 v2.305.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.4
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.4
|
||||
go.etcd.io/etcd/raft/v3 v3.5.4
|
||||
go.etcd.io/etcd/server/v3 v3.5.4
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.38.0
|
||||
|
18
tests/go.sum
18
tests/go.sum
@ -239,8 +239,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
@ -348,8 +348,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
|
||||
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -394,8 +394,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -436,16 +437,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
|
||||
|
103
tests/integration/clientv3/mirror_auth_test.go
Normal file
103
tests/integration/clientv3/mirror_auth_test.go
Normal file
@ -0,0 +1,103 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
// +build !cluster_proxy
|
||||
|
||||
package clientv3test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/mirror"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func TestMirrorSync_Authenticated(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
initialClient := clus.Client(0)
|
||||
|
||||
// Create a user to run the mirror process that only has access to /syncpath
|
||||
initialClient.RoleAdd(context.Background(), "syncer")
|
||||
initialClient.RoleGrantPermission(context.Background(), "syncer", "/syncpath", clientv3.GetPrefixRangeEnd("/syncpath"), clientv3.PermissionType(clientv3.PermReadWrite))
|
||||
initialClient.UserAdd(context.Background(), "syncer", "syncfoo")
|
||||
initialClient.UserGrantRole(context.Background(), "syncer", "syncer")
|
||||
|
||||
// Seed /syncpath with some initial data
|
||||
_, err := initialClient.KV.Put(context.TODO(), "/syncpath/foo", "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Require authentication
|
||||
authSetupRoot(t, initialClient.Auth)
|
||||
|
||||
// Create a client as the `syncer` user.
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: initialClient.Endpoints(),
|
||||
DialTimeout: 5 * time.Second,
|
||||
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
||||
Username: "syncer",
|
||||
Password: "syncfoo",
|
||||
}
|
||||
syncClient, err := integration.NewClient(t, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer syncClient.Close()
|
||||
|
||||
// Now run the sync process, create changes, and get the initial sync state
|
||||
syncer := mirror.NewSyncer(syncClient, "/syncpath", 0)
|
||||
gch, ech := syncer.SyncBase(context.TODO())
|
||||
wkvs := []*mvccpb.KeyValue{{Key: []byte("/syncpath/foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}
|
||||
|
||||
for g := range gch {
|
||||
if !reflect.DeepEqual(g.Kvs, wkvs) {
|
||||
t.Fatalf("kv = %v, want %v", g.Kvs, wkvs)
|
||||
}
|
||||
}
|
||||
|
||||
for e := range ech {
|
||||
t.Fatalf("unexpected error %v", e)
|
||||
}
|
||||
|
||||
// Start a continuous sync
|
||||
wch := syncer.SyncUpdates(context.TODO())
|
||||
|
||||
// Update state
|
||||
_, err = syncClient.KV.Put(context.TODO(), "/syncpath/foo", "baz")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Wait for the updated state to sync
|
||||
select {
|
||||
case r := <-wch:
|
||||
wkv := &mvccpb.KeyValue{Key: []byte("/syncpath/foo"), Value: []byte("baz"), CreateRevision: 2, ModRevision: 3, Version: 2}
|
||||
if !reflect.DeepEqual(r.Events[0].Kv, wkv) {
|
||||
t.Fatalf("kv = %v, want %v", r.Events[0].Kv, wkv)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive update in one second")
|
||||
}
|
||||
}
|
@ -170,6 +170,7 @@ type ClusterConfig struct {
|
||||
LeaseCheckpointPersist bool
|
||||
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
CorruptCheckTime time.Duration
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
@ -332,6 +333,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
|
||||
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
|
||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
||||
CorruptCheckTime: c.cfg.CorruptCheckTime,
|
||||
})
|
||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||
if c.cfg.UseGRPC {
|
||||
@ -635,6 +637,7 @@ type memberConfig struct {
|
||||
leaseCheckpointInterval time.Duration
|
||||
leaseCheckpointPersist bool
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
CorruptCheckTime time.Duration
|
||||
}
|
||||
|
||||
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||
@ -737,6 +740,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||
|
||||
m.InitialCorruptCheck = true
|
||||
if mcfg.CorruptCheckTime > time.Duration(0) {
|
||||
m.CorruptCheckTime = mcfg.CorruptCheckTime
|
||||
}
|
||||
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
|
||||
|
||||
m.V2Deprecation = config.V2_DEPR_DEFAULT
|
||||
@ -1448,7 +1454,7 @@ func (c *ClusterV3) Client(i int) *clientv3.Client {
|
||||
|
||||
func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {
|
||||
if c.clusterClient == nil {
|
||||
endpoints := []string{}
|
||||
var endpoints []string
|
||||
for _, m := range c.Members {
|
||||
endpoints = append(endpoints, m.grpcURL)
|
||||
}
|
||||
|
@ -15,14 +15,14 @@
|
||||
package integration_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBeforeTestWithoutLeakDetection(t *testing.T) {
|
||||
integration.BeforeTest(t, integration.WithoutGoLeakDetection(), integration.WithoutSkipInShort())
|
||||
// Intentional leak that should get ignored
|
||||
go time.Sleep(2 * time.Second)
|
||||
go func() {
|
||||
|
||||
}()
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@ -25,8 +26,10 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
@ -228,3 +231,119 @@ func TestV3CorruptAlarm(t *testing.T) {
|
||||
}
|
||||
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
|
||||
}
|
||||
|
||||
func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{
|
||||
CorruptCheckTime: time.Second,
|
||||
Size: 3,
|
||||
SnapshotCount: 10,
|
||||
SnapshotCatchUpEntries: 5,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
|
||||
if err != nil {
|
||||
t.Errorf("could not create lease 1 (%v)", err)
|
||||
}
|
||||
if lresp.ID != 1 {
|
||||
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
|
||||
}
|
||||
|
||||
putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
|
||||
// Trigger snapshot from the leader to new member
|
||||
for i := 0; i < 15; i++ {
|
||||
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
clus.RemoveMember(t, uint64(clus.Members[2].ID()))
|
||||
oldMemberClient := clus.Client(2)
|
||||
if err := oldMemberClient.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clus.AddMember(t)
|
||||
// Wait for new member to catch up
|
||||
newMemberClient, err := clus.NewClientV3(2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
WaitClientV3(t, newMemberClient)
|
||||
clus.clients[2] = newMemberClient
|
||||
|
||||
// Corrupt member 2 by modifying backend lease bucket offline.
|
||||
clus.Members[2].Stop(t)
|
||||
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
|
||||
bcfg := backend.DefaultBackendConfig()
|
||||
bcfg.Path = fp
|
||||
bcfg.Logger = zaptest.NewLogger(t)
|
||||
be := backend.New(bcfg)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.UnsafeDelete(buckets.Lease, leaseIdToBytes(1))
|
||||
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
|
||||
mustUnsafePutLease(tx, &lpb)
|
||||
tx.Commit()
|
||||
|
||||
if err := be.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := clus.Members[2].Restart(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clus.Members[1].WaitOK(t)
|
||||
clus.Members[2].WaitOK(t)
|
||||
|
||||
// Revoke lease should remove key except the member with corruption
|
||||
_, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo")
|
||||
if err0 != nil {
|
||||
t.Fatal(err0)
|
||||
}
|
||||
resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo")
|
||||
if err1 != nil {
|
||||
t.Fatal(err1)
|
||||
}
|
||||
|
||||
if resp0.Header.Revision == resp1.Header.Revision {
|
||||
t.Fatalf("matching Revision values")
|
||||
}
|
||||
|
||||
// Wait for CorruptCheckTime
|
||||
time.Sleep(time.Second)
|
||||
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
|
||||
if perr != nil {
|
||||
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
|
||||
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func leaseIdToBytes(n int64) []byte {
|
||||
bytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bytes, uint64(n))
|
||||
return bytes
|
||||
}
|
||||
|
||||
func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||
key := leaseIdToBytes(lpb.ID)
|
||||
|
||||
val, err := lpb.Marshal()
|
||||
if err != nil {
|
||||
panic("failed to marshal lease proto item")
|
||||
}
|
||||
tx.UnsafePut(buckets.Lease, key, val)
|
||||
}
|
||||
|
@ -46,6 +46,33 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3AuthEmptyUserPut ensures that a put with an empty user will return an empty user error,
|
||||
// and the consistent_index should be moved forward even the apply-->Put fails.
|
||||
func TestV3AuthEmptyUserPut(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{
|
||||
Size: 1,
|
||||
SnapshotCount: 3,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
api := toGRPC(clus.Client(0))
|
||||
authSetupRoot(t, api.Auth)
|
||||
|
||||
// The SnapshotCount is 3, so there must be at least 3 new snapshot files being created.
|
||||
// The VERIFY logic will check whether the consistent_index >= last snapshot index on
|
||||
// cluster terminating.
|
||||
for i := 0; i < 10; i++ {
|
||||
_, err := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
|
||||
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
|
||||
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3AuthTokenWithDisable tests that auth won't crash if
|
||||
// given a valid token when authentication is disabled
|
||||
func TestV3AuthTokenWithDisable(t *testing.T) {
|
||||
|
@ -16,7 +16,9 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -141,6 +143,91 @@ func TestV3LeaseGrantByID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend.
|
||||
//
|
||||
// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease
|
||||
// and delete associated keys to ensure kv store data consistency
|
||||
//
|
||||
// It ensures issue 12535 is fixed by PR 13676
|
||||
func TestV3LeaseNegativeID(t *testing.T) {
|
||||
tcs := []struct {
|
||||
leaseID int64
|
||||
k []byte
|
||||
v []byte
|
||||
}{
|
||||
{
|
||||
leaseID: -1, // int64 -1 is 2^64 -1 in uint64
|
||||
k: []byte("foo"),
|
||||
v: []byte("bar"),
|
||||
},
|
||||
{
|
||||
leaseID: math.MaxInt64,
|
||||
k: []byte("bar"),
|
||||
v: []byte("foo"),
|
||||
},
|
||||
{
|
||||
leaseID: math.MinInt64,
|
||||
k: []byte("hello"),
|
||||
v: []byte("world"),
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
cc := clus.RandClient()
|
||||
lresp, err := toGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300})
|
||||
if err != nil {
|
||||
t.Errorf("could not create lease %d (%v)", tc.leaseID, err)
|
||||
}
|
||||
if lresp.ID != tc.leaseID {
|
||||
t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID)
|
||||
}
|
||||
putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID}
|
||||
_, err = toGRPC(cc).KV.Put(ctx, putr)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't put key (%v)", err)
|
||||
}
|
||||
|
||||
// wait for backend Commit
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// restore lessor from db file
|
||||
clus.Members[2].Stop(t)
|
||||
if err := clus.Members[2].Restart(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// revoke lease should remove key
|
||||
WaitClientV3(t, clus.Client(2))
|
||||
_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID})
|
||||
if err != nil {
|
||||
t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err)
|
||||
}
|
||||
var revision int64
|
||||
for i := range clus.Members {
|
||||
getr := &pb.RangeRequest{Key: tc.k}
|
||||
getresp, err := toGRPC(clus.Client(i)).KV.Range(ctx, getr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if revision == 0 {
|
||||
revision = getresp.Header.Revision
|
||||
}
|
||||
if revision != getresp.Header.Revision {
|
||||
t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision)
|
||||
}
|
||||
if len(getresp.Kvs) != 0 {
|
||||
t.Errorf("lease removed but key remains")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseExpire ensures a key is deleted once a key expires.
|
||||
func TestV3LeaseExpire(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
@ -412,17 +499,31 @@ func TestV3LeaseLeases(t *testing.T) {
|
||||
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/etcd-io/etcd/issues/6978
|
||||
func TestV3LeaseRenewStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseRenew)
|
||||
testLeaseStress(t, stressLeaseRenew, false)
|
||||
}
|
||||
|
||||
// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress,
|
||||
// but it uses a cluster client instead of a specific member's client.
|
||||
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
|
||||
func TestV3LeaseRenewStressWithClusterClient(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseRenew, true)
|
||||
}
|
||||
|
||||
// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
|
||||
// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/etcd-io/etcd/issues/6978
|
||||
func TestV3LeaseTimeToLiveStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseTimeToLive)
|
||||
testLeaseStress(t, stressLeaseTimeToLive, false)
|
||||
}
|
||||
|
||||
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
|
||||
// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress,
|
||||
// but it uses a cluster client instead of a specific member's client.
|
||||
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
|
||||
func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseTimeToLive, true)
|
||||
}
|
||||
|
||||
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
@ -431,13 +532,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
|
||||
defer cancel()
|
||||
errc := make(chan error)
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
for j := 0; j < 3; j++ {
|
||||
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
|
||||
if useClusterClient {
|
||||
for i := 0; i < 300; i++ {
|
||||
clusterClient, err := clus.ClusterClient()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func(i int) { errc <- stresser(ctx, toGRPC(clusterClient).Lease) }(i)
|
||||
}
|
||||
} else {
|
||||
for i := 0; i < 100; i++ {
|
||||
for j := 0; j < 3; j++ {
|
||||
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 90; i++ {
|
||||
for i := 0; i < 300; i++ {
|
||||
if err := <-errc; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -468,7 +579,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
|
||||
continue
|
||||
}
|
||||
if rresp.TTL == 0 {
|
||||
return fmt.Errorf("TTL shouldn't be 0 so soon")
|
||||
return errors.New("TTL shouldn't be 0 so soon")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user