Compare commits

...

50 Commits

Author SHA1 Message Date
08407ff760 version: bump up to 3.5.4 2022-04-24 12:44:36 +02:00
c3c908e39a Merge pull request #13946 from ahrtr/move_cindex_on_apply_fail_353
[3.5] Update consitent_index when applying fails
2022-04-21 16:21:24 +02:00
5c68f2e510 Update conssitent_index when applying fails
When clients have no permission to perform whatever operation, then
the applying may fail. We should also move consistent_index forward
in this case, otherwise the consitent_index may smaller than the
snapshot index.
2022-04-20 22:17:49 +08:00
b872757492 Merge pull request #13950 from liggitt/revert-srv-dot-3.5
Revert #13714
2022-04-19 12:23:02 +02:00
081b4e2a0f Add unit test for canonical SRV records 2022-04-15 15:33:31 -04:00
76564778a9 Revert "trim the suffix dot from the srv.Target for etcd-client DNS lookup"
This reverts commit 4f51cc1d9a.
2022-04-15 15:33:16 -04:00
0452feec71 version: bump up to 3.5.3 2022-04-13 17:17:51 +02:00
842cb4b4fc Merge pull request #13938 from endocrimes/dani/backport
[backport]  PR 13923 to release-3.5
2022-04-13 15:15:59 +02:00
50978d5b25 clientv3: disable mirror auth test with proxy 2022-04-13 12:41:24 +00:00
5d44f2242f cv3/mirror: Fetch the most recent prefix revision
When a user sets up a Mirror with a restricted user that doesn't have
access to the `foo` path, we will fail to get the most recent revision
due to permissions issues.

With this change, when a prefix is provided we will get the initial
revision from the prefix rather than /foo. This allows restricted users
to setup sync.
2022-04-13 12:39:06 +00:00
cd750e4542 Merge pull request #13862 from mrueg/update-baseimage
[release-3.5] Dockerfile*: Switch baseimage to k8s hosted one
2022-04-12 11:55:46 +02:00
003a310489 Merge pull request #13933 from ahrtr/fix_snapshot_recover_cindex_3.5
[3.5]Set backend to cindex before recovering the lessor in applySnapshot
2022-04-12 10:46:55 +02:00
6095cf810a Dockerfile-release.*: Update base image snapshot
Signed-off-by: Manuel Rüger <manuel@rueg.eu>
2022-04-12 10:39:55 +02:00
719082e4fc Merge pull request #13932 from ahrtr/lease_renew_linearizable_3.5
[3.5] Support linearizable renew lease
2022-04-12 10:03:20 +02:00
4002aa51bd set backend to cindex before recovering the lessor in applySnapshot 2022-04-12 15:56:14 +08:00
bc5307de95 support linearizable renew lease
When etcdserver receives a LeaseRenew request, it may be still in
progress of processing the LeaseGrantRequest on exact the same
leaseID. Accordingly it may return a TTL=0 to client due to the
leaseID not found error. So the leader should wait for the appliedID
to be available before processing client requests.
2022-04-12 14:12:45 +08:00
b9cbff151c Merge pull request #13917 from chrisayoub/release-3.5
[release-3.5] clientv3: filter learners members during autosync
2022-04-10 15:43:28 +02:00
232fb980a7 clientv3: filter learners members during autosync
This change is to ensure that all members returned during the client's
AutoSync are started and are not learners, which are not valid
etcd members to make requests to.
2022-04-09 21:25:42 -04:00
383eceb885 Merge pull request #13669 from maxsokolovsky/upgrade-server-dependency-golang.org/x/crypto
etcdserver: upgrade the golang.org/x/crypto dependency
2022-04-09 09:44:05 +02:00
bf22ef3b03 Merge pull request #13908 from ahrtr/data_corruption_3.5
[3.5] Fix the data inconsistency issue by adding a txPostLockHook into the backend
2022-04-08 19:30:17 +02:00
66c7aab4d3 fix the data inconsistency issue by adding a txPostLockHook into the backend
Previously the SetConsistentIndex() is called during the apply workflow,
but it's outside the db transaction. If a commit happens between SetConsistentIndex
and the following apply workflow, and etcd crashes for whatever reason right
after the commit, then etcd commits an incomplete transaction to db.
Eventually etcd runs into the data inconsistency issue.

In this commit, we move the SetConsistentIndex into a txPostLockHook, so
it will be executed inside the transaction lock.
2022-04-08 20:37:34 +08:00
3ace622792 Merge pull request #13904 from serathius/term-v3.5
[release-3.5] server: Save consistency index and term to backend even when they decrease
2022-04-08 14:03:32 +02:00
780ec338f0 server: Save consistency index and term to backend even when they decrease
Reason to store CI and term in backend was to make db fully independent
snapshot, it was never meant to interfere with apply logic. Skip of CI
was introduced for v2->v3 migration where we wanted to prevent it from
decreasing when replaying wal in
https://github.com/etcd-io/etcd/pull/5391. By mistake it was added to
apply flow during refactor in
https://github.com/etcd-io/etcd/pull/12855#commitcomment-70713670.

Consistency index and term should only be negotiated and used by raft to make
decisions. Their values should only driven by raft state machine and
backend should only be responsible for storing them.
2022-04-07 21:22:18 +02:00
238b18c110 Merge pull request #13895 from mrueg/rel3.5-client_golang
[release-3.5] go.mod: Upgrade to prometheus/client_golang v1.11.1
2022-04-07 09:38:43 +02:00
5f1968887c Merge pull request #13887 from serathius/verify-v3.5
[release-v3.5] server: Add verification of whether lock was called within out outsid…
2022-04-06 14:32:03 +02:00
83538f342d server: Add verification of whether lock was called within out outside of apply 2022-04-06 11:22:51 +02:00
3b8c6512df go.mod: Upgrade to prometheus/client_golang v1.11.1 2022-04-06 00:35:48 +02:00
8b9ce3e150 Merge pull request #13866 from serathius/logs-v3.5
[release-3.5] Fix inconsistent log format
2022-04-04 13:04:16 +02:00
a060b42e47 server: Use default logging configuration instead of zap production one
This fixes problem where logs json changes format of timestamp.
2022-04-01 12:23:44 +02:00
25556a08a8 tests: Keeps log in expect to allow their analysis 2022-04-01 12:23:14 +02:00
12ceac6fdd Merge pull request #13858 from mrueg/release-3.5-fix-make
[Release-3.5] Makefile: Fix wrong target
2022-03-31 11:07:38 +02:00
462aefdfe1 Makefile: Fix wrong target
Signed-off-by: Manuel Rüger <manuel@rueg.eu>
2022-03-31 10:01:15 +02:00
fed325a95a Merge pull request #13832 from mrueg/rel-3.5-go-1.16.15
[release-3.5] Update go to 1.16.15
2022-03-23 11:30:20 +01:00
c51c8d24e1 Build locally if docker-test container image does not exist
This should make it easier to bump the golang version
2022-03-22 00:03:57 +01:00
1801ef8d71 Update to go 1.16.15 2022-03-22 00:00:22 +01:00
d5161347da Merge pull request #13792 from kkkkun/fix-offline-defrag
Fix offline defrag in etcdctl
2022-03-17 11:22:13 +01:00
58374b83a1 Fix offline defrag 2022-03-13 21:17:59 +08:00
39baf36ca3 Merge pull request #13726 from chaochn47/backport_13676_to_3_5
backport 3.5: #13676 load all leases from backend
2022-03-08 10:40:28 +01:00
541635e36a Merge pull request #13701 from lavacat/defrag-bopts-fix-3.5
server/storage/backend: restore original bolt db options after defrag
2022-03-08 10:38:31 +01:00
3c2ef71358 Merge pull request #13727 from ahrtr/3.5_print_raft_term_in_decimal
[3.5] Always print raft term in decimal when displaying member list in json
2022-02-25 08:59:52 +01:00
1eb8b6a75c Merge pull request #13736 from vivekpatani/release-3.5
*: fix IsOptsWithFromKey
2022-02-24 22:50:27 +01:00
7cec92a281 *: fix IsOptsWithFromKey
porting back from 3.5 from main
PR #13334
2022-02-23 16:42:00 -08:00
f634b44046 backport 3.5: #13676 load all leases from backend 2022-02-22 10:21:04 -08:00
7345d4211b always print raft_term in decimal when displaying member list in json 2022-02-22 17:09:21 +08:00
fa191c64bd Merge pull request #13706 from ahrtr/3.5-serializable_health_check
[3.5] enhance health check endpoint to support serializable request
2022-02-21 21:59:58 +01:00
31c8e3c7a5 Merge pull request #13714 from ahrtr/3.5_correct_dns_etcd_client
[3.5] Trim the suffix dot from the srv.Target for etcd-client DNS lookup
2022-02-20 13:34:09 +01:00
4f51cc1d9a trim the suffix dot from the srv.Target for etcd-client DNS lookup 2022-02-19 05:55:19 +08:00
7db1051774 enhance health check endpoint to support serializable request 2022-02-17 15:03:22 +08:00
631fa6fd65 server/storage/backend: restore original bolt db options after defrag
Problem: Defrag was implemented before custom bolt options were added.
Currently defrag doesn't restore backend options.
For example BackendFreelistType will be unset after defrag.

Solution: save bolt db options and use them in defrag.
2022-02-15 10:56:07 -08:00
f4708ae3d4 etcdserver: upgrade the golang.org/x/crypto dependency
To rectify the vulnerability found in a version of golang.org/x/crypto
(https://avd.aquasec.com/nvd/cve-2020-29652), upgrade the dependency to
its latest version.
Alternatively, version v0.0.0-20201216223049-8b5274cf687f could be used,
where the fixed was introduced, but the latest is preferable.
2022-02-07 10:11:46 -05:00
96 changed files with 1462 additions and 378 deletions

View File

@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.3"
go-version: "1.16.15"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.3"
go-version: "1.16.15"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.3"
go-version: "1.16.15"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.3"
go-version: "1.16.15"
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@ -6,7 +6,7 @@ sudo: required
services: docker
go:
- "1.16.3"
- "1.16.15"
- tip
notifications:
@ -21,14 +21,14 @@ env:
matrix:
fast_finish: true
allow_failures:
- go: "1.16.3"
- go: "1.16.15"
env: TARGET=linux-amd64-coverage
- go: tip
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
exclude:
- go: tip
env: TARGET=linux-amd64-coverage
- go: "1.16.3"
- go: "1.16.15"
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
before_install:

View File

@ -1,5 +1,5 @@
# TODO: move to k8s.gcr.io/build-image/debian-base:bullseye-v1.y.z when patched
FROM debian:bullseye-20210927
FROM debian:bullseye-20220328
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,5 +1,5 @@
# TODO: move to k8s.gcr.io/build-image/debian-base-arm64:bullseye-1.y.z when patched
FROM arm64v8/debian:bullseye-20210927
FROM arm64v8/debian:bullseye-20220328
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,5 +1,5 @@
# TODO: move to k8s.gcr.io/build-image/debian-base-ppc64le:bullseye-1.y.z when patched
FROM ppc64le/debian:bullseye-20210927
FROM ppc64le/debian:bullseye-20220328
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -1,5 +1,5 @@
# TODO: move to k8s.gcr.io/build-image/debian-base-s390x:bullseye-1.y.z when patched
FROM s390x/debian:bullseye-20210927
FROM s390x/debian:bullseye-20220328
ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/

View File

@ -55,7 +55,7 @@ docker-remove:
GO_VERSION ?= 1.16.3
GO_VERSION ?= 1.16.15
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
@ -161,7 +161,10 @@ test-full:
$(info log-file: test-$(TEST_SUFFIX).log)
PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
docker-test:
ensure-docker-test-image-exists:
make pull-docker-test || echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test
docker-test: ensure-docker-test-image-exists
$(info GO_VERSION: $(GO_VERSION))
$(info ETCD_VERSION: $(ETCD_VERSION))
$(info TEST_OPTS: $(TEST_OPTS))

View File

@ -75,6 +75,7 @@ var (
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
@ -208,6 +209,7 @@ var (
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.5.2"
Version = "3.5.4"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@ -21,6 +21,17 @@ import (
"go.uber.org/zap/zapcore"
)
// CreateDefaultZapLogger creates a logger with default zap configuration
func CreateDefaultZapLogger(level zapcore.Level) (*zap.Logger, error) {
lcfg := DefaultZapLoggerConfig
lcfg.Level = zap.NewAtomicLevelAt(level)
c, err := lcfg.Build()
if err != nil {
return nil, err
}
return c, nil
}
// DefaultZapLoggerConfig defines default zap logger configuration.
var DefaultZapLoggerConfig = zap.Config{
Level: zap.NewAtomicLevelAt(ConvertToZapLevel(DefaultLogLevel)),

View File

@ -228,10 +228,10 @@ func TestSRVDiscover(t *testing.T) {
[]*net.SRV{
{Target: "a.example.com", Port: 2480},
{Target: "b.example.com", Port: 2480},
{Target: "c.example.com", Port: 2480},
{Target: "c.example.com.", Port: 2480},
},
[]*net.SRV{},
[]string{"https://a.example.com:2480", "https://b.example.com:2480", "https://c.example.com:2480"},
[]string{"https://a.example.com:2480", "https://b.example.com:2480", "https://c.example.com.:2480"},
false,
},
}

View File

@ -5,8 +5,8 @@ go 1.16
require (
github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
)
replace (

View File

@ -24,6 +24,7 @@ import (
"time"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/client/v3/internal/endpoint"
"go.etcd.io/etcd/client/v3/internal/resolver"
@ -184,7 +185,9 @@ func (c *Client) Sync(ctx context.Context) error {
}
var eps []string
for _, m := range mresp.Members {
eps = append(eps, m.ClientURLs...)
if len(m.Name) != 0 && !m.IsLearner {
eps = append(eps, m.ClientURLs...)
}
}
c.SetEndpoints(eps...)
return nil
@ -368,7 +371,10 @@ func newClient(cfg *Config) (*Client, error) {
} else if cfg.LogConfig != nil {
client.lg, err = cfg.LogConfig.Build()
} else {
client.lg, err = CreateDefaultZapLogger()
client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
if client.lg != nil {
client.lg = client.lg.Named("etcd-client")
}
}
if err != nil {
return nil, err

View File

@ -17,13 +17,14 @@ package clientv3
import (
"context"
"fmt"
"go.uber.org/zap"
"net"
"testing"
"time"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
@ -198,3 +199,49 @@ func TestZapWithLogger(t *testing.T) {
t.Errorf("WithZapLogger should modify *zap.Logger")
}
}
func TestSyncFiltersMembers(t *testing.T) {
c, _ := NewClient(t, Config{Endpoints: []string{"http://254.0.0.1:12345"}})
defer c.Close()
c.Cluster = &mockCluster{
[]*etcdserverpb.Member{
{ID: 0, Name: "", ClientURLs: []string{"http://254.0.0.1:12345"}, IsLearner: false},
{ID: 1, Name: "isStarted", ClientURLs: []string{"http://254.0.0.2:12345"}, IsLearner: true},
{ID: 2, Name: "isStartedAndNotLearner", ClientURLs: []string{"http://254.0.0.3:12345"}, IsLearner: false},
},
}
c.Sync(context.Background())
endpoints := c.Endpoints()
if len(endpoints) != 1 || endpoints[0] != "http://254.0.0.3:12345" {
t.Error("Client.Sync uses learner and/or non-started member client URLs")
}
}
type mockCluster struct {
members []*etcdserverpb.Member
}
func (mc *mockCluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
return &MemberListResponse{Members: mc.members}, nil
}
func (mc *mockCluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
return nil, nil
}
func (mc *mockCluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
return nil, nil
}
func (mc *mockCluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
return nil, nil
}
func (mc *mockCluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
return nil, nil
}
func (mc *mockCluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
return nil, nil
}

View File

@ -5,9 +5,9 @@ go 1.16
require (
github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.11.0
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
github.com/prometheus/client_golang v1.11.1
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0
sigs.k8s.io/yaml v1.2.0

View File

@ -106,8 +106,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=

View File

@ -19,7 +19,6 @@ import (
"os"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc/grpclog"
@ -29,10 +28,11 @@ func init() {
// We override grpc logger only when the environment variable is set
// in order to not interfere by default with user's code or other libraries.
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
lg, err := CreateDefaultZapLogger()
lg, err := logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
if err != nil {
panic(err)
}
lg = lg.Named("etcd-client")
grpclog.SetLoggerV2(zapgrpc.NewLogger(lg))
}
}
@ -57,21 +57,3 @@ func etcdClientDebugLevel() zapcore.Level {
}
return l
}
// CreateDefaultZapLoggerConfig creates a logger config that is configurable using env variable:
// ETCD_CLIENT_DEBUG= debug|info|warn|error|dpanic|panic|fatal|true (true=info)
func CreateDefaultZapLoggerConfig() zap.Config {
lcfg := logutil.DefaultZapLoggerConfig
lcfg.Level = zap.NewAtomicLevelAt(etcdClientDebugLevel())
return lcfg
}
// CreateDefaultZapLogger creates a logger that is configurable using env variable:
// ETCD_CLIENT_DEBUG= debug|info|warn|error|dpanic|panic|fatal|true (true=info)
func CreateDefaultZapLogger() (*zap.Logger, error) {
c, err := CreateDefaultZapLoggerConfig().Build()
if err != nil {
return nil, err
}
return c.Named("etcd-client"), nil
}

View File

@ -18,7 +18,7 @@ package mirror
import (
"context"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
@ -52,7 +52,13 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
// if rev is not specified, we will choose the most recent revision.
if s.rev == 0 {
resp, err := s.c.Get(ctx, "foo")
// If len(s.prefix) == 0, we will check a random key to fetch the most recent
// revision (foo), otherwise we use the provided prefix.
checkPath := "foo"
if len(s.prefix) != 0 {
checkPath = s.prefix
}
resp, err := s.c.Get(ctx, checkPath)
if err != nil {
errchan <- err
close(respchan)

View File

@ -77,6 +77,9 @@ type Op struct {
cmps []Cmp
thenOps []Op
elseOps []Op
isOptsWithFromKey bool
isOptsWithPrefix bool
}
// accessors / mutators
@ -216,6 +219,10 @@ func (op Op) isWrite() bool {
return op.t != tRange
}
func NewOp() *Op {
return &Op{key: []byte("")}
}
// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
// WithPrefix and WithFromKey are not supported together
@ -387,6 +394,7 @@ func WithPrefix() OpOption {
return
}
op.end = getPrefix(op.key)
op.isOptsWithPrefix = true
}
}
@ -406,6 +414,7 @@ func WithFromKey() OpOption {
op.key = []byte{0}
}
op.end = []byte("\x00")
op.isOptsWithFromKey = true
}
}
@ -554,7 +563,21 @@ func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLi
}
// IsOptsWithPrefix returns true if WithPrefix option is called in the given opts.
func IsOptsWithPrefix(opts []OpOption) bool { return isOpFuncCalled("WithPrefix", opts) }
func IsOptsWithPrefix(opts []OpOption) bool {
ret := NewOp()
for _, opt := range opts {
opt(ret)
}
return ret.isOptsWithPrefix
}
// IsOptsWithFromKey returns true if WithFromKey option is called in the given opts.
func IsOptsWithFromKey(opts []OpOption) bool { return isOpFuncCalled("WithFromKey", opts) }
func IsOptsWithFromKey(opts []OpOption) bool {
ret := NewOp()
for _, opt := range opts {
opt(ret)
}
return ret.isOptsWithFromKey
}

View File

@ -44,9 +44,6 @@ func hasChecksum(n int64) bool {
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error {
if lg == nil {
lg = zap.NewExample()
}
cfg.Logger = lg.Named("client")
if len(cfg.Endpoints) != 1 {
return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)

View File

@ -16,9 +16,6 @@ package clientv3
import (
"math/rand"
"reflect"
"runtime"
"strings"
"time"
)
@ -32,18 +29,3 @@ func jitterUp(duration time.Duration, jitter float64) time.Duration {
multiplier := jitter * (rand.Float64()*2 - 1)
return time.Duration(float64(duration) * (1 + multiplier))
}
// Check if the provided function is being called in the op options.
func isOpFuncCalled(op string, opts []OpOption) bool {
for _, opt := range opts {
v := reflect.ValueOf(opt)
if v.Kind() == reflect.Func {
if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil {
if strings.Contains(opFunc.Name(), op) {
return true
}
}
}
}
return false
}

View File

@ -46,6 +46,7 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
return
}
failures := 0

View File

@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/pkg/v3/flags"
@ -88,7 +89,7 @@ type epHealth struct {
// epHealthCommandFunc executes the "endpoint-health" command.
func epHealthCommandFunc(cmd *cobra.Command, args []string) {
lg, err := zap.NewProduction()
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/bgentry/speakeasy"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/srv"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
@ -114,7 +115,7 @@ func (*discardValue) Set(string) error { return nil }
func (*discardValue) Type() string { return "" }
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
lg, err := zap.NewProduction()
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
@ -193,7 +194,7 @@ func newClientCfg(endpoints []string, dialTimeout, keepAliveTime, keepAliveTimeo
// set tls if any one tls option set
var cfgtls *transport.TLSInfo
tlsinfo := transport.TLSInfo{}
tlsinfo.Logger, _ = zap.NewProduction()
tlsinfo.Logger, _ = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if scfg.cert != "" {
tlsinfo.CertFile = scfg.cert
cfgtls = &tlsinfo

View File

@ -67,7 +67,7 @@ func printMemberListWithHexJSON(r clientv3.MemberListResponse) {
b = strconv.AppendUint(nil, r.Header.MemberId, 16)
buffer.Write(b)
buffer.WriteString("\",\"raft_term\":")
b = strconv.AppendUint(nil, r.Header.RaftTerm, 16)
b = strconv.AppendUint(nil, r.Header.RaftTerm, 10)
buffer.Write(b)
buffer.WriteByte('}')
for i := 0; i < len(r.Members); i++ {

View File

@ -20,6 +20,7 @@ import (
"os"
"github.com/spf13/cobra"
"go.etcd.io/etcd/client/pkg/v3/logutil"
snapshot "go.etcd.io/etcd/client/v3/snapshot"
"go.etcd.io/etcd/etcdutl/v3/etcdutl"
"go.etcd.io/etcd/pkg/v3/cobrautl"
@ -98,7 +99,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
}
lg, err := zap.NewProduction()
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v2 v2.305.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/etcdutl/v3 v3.5.4
go.etcd.io/etcd/pkg/v3 v3.5.4
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0

View File

@ -236,8 +236,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -341,8 +341,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -387,8 +387,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -428,16 +429,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=

View File

@ -319,10 +319,10 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -15,13 +15,14 @@
package etcdutl
import (
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func GetLogger() *zap.Logger {
config := zap.NewProductionConfig()
config := logutil.DefaultZapLoggerConfig
config.Encoding = "console"
config.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
lg, err := config.Build()

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/pkg/v3 v3.5.4
go.etcd.io/etcd/raft/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.17.0
)

View File

@ -230,8 +230,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -331,8 +331,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -377,8 +377,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -418,16 +419,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=

View File

@ -69,9 +69,6 @@ type Manager interface {
// NewV3 returns a new snapshot Manager for v3.x snapshot.
func NewV3(lg *zap.Logger) Manager {
if lg == nil {
lg = zap.NewExample()
}
return &v3Manager{lg: lg}
}
@ -479,6 +476,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
return nil
}

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdctl/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v2 v2.305.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/etcdctl/v3 v3.5.4
go.etcd.io/etcd/etcdutl/v3 v3.5.4
go.etcd.io/etcd/pkg/v3 v3.5.4
go.etcd.io/etcd/raft/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.etcd.io/etcd/tests/v3 v3.5.4
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0

18
go.sum
View File

@ -244,8 +244,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -354,8 +354,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -400,8 +400,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -442,16 +443,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=

View File

@ -25,6 +25,7 @@ import (
"strings"
"sync"
"syscall"
"time"
"github.com/creack/pty"
)
@ -36,7 +37,6 @@ type ExpectProcess struct {
fpty *os.File
wg sync.WaitGroup
cond *sync.Cond // for broadcasting updates are available
mu sync.Mutex // protects lines and err
lines []string
count int // increment whenever new line gets added
@ -60,7 +60,6 @@ func NewExpectWithEnv(name string, args []string, env []string) (ep *ExpectProce
cmd: cmd,
StopSignal: syscall.SIGKILL,
}
ep.cond = sync.NewCond(&ep.mu)
ep.cmd.Stderr = ep.cmd.Stdout
ep.cmd.Stdin = nil
@ -77,52 +76,56 @@ func (ep *ExpectProcess) read() {
defer ep.wg.Done()
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
r := bufio.NewReader(ep.fpty)
for ep.err == nil {
l, rerr := r.ReadString('\n')
for {
l, err := r.ReadString('\n')
ep.mu.Lock()
ep.err = rerr
if l != "" {
if printDebugLines {
fmt.Printf("%s-%d: %s", ep.cmd.Path, ep.cmd.Process.Pid, l)
}
ep.lines = append(ep.lines, l)
ep.count++
if len(ep.lines) == 1 {
ep.cond.Signal()
}
}
if err != nil {
ep.err = err
ep.mu.Unlock()
break
}
ep.mu.Unlock()
}
ep.cond.Signal()
}
// ExpectFunc returns the first line satisfying the function f.
func (ep *ExpectProcess) ExpectFunc(f func(string) bool) (string, error) {
lastLinesBuffer := make([]string, 0)
i := 0
ep.mu.Lock()
for {
for len(ep.lines) == 0 && ep.err == nil {
ep.cond.Wait()
ep.mu.Lock()
for i < len(ep.lines) {
line := ep.lines[i]
i++
if f(line) {
ep.mu.Unlock()
return line, nil
}
}
if len(ep.lines) == 0 {
if ep.err != nil {
ep.mu.Unlock()
break
}
l := ep.lines[0]
ep.lines = ep.lines[1:]
lastLinesBuffer = append(lastLinesBuffer, l)
if l := len(lastLinesBuffer); l > DEBUG_LINES_TAIL {
lastLinesBuffer = lastLinesBuffer[l-DEBUG_LINES_TAIL : l-1]
}
if f(l) {
ep.mu.Unlock()
return l, nil
}
ep.mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
ep.mu.Lock()
lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL
if lastLinesIndex < 0 {
lastLinesIndex = 0
}
lastLines := strings.Join(ep.lines[lastLinesIndex:], "")
ep.mu.Unlock()
return "", fmt.Errorf("match not found."+
" Set EXPECT_DEBUG for more info Err: %v, last lines:\n%s",
ep.err, strings.Join(lastLinesBuffer, ""))
ep.err, lastLines)
}
// Expect returns the first line containing the given string.
@ -189,3 +192,9 @@ func (ep *ExpectProcess) ProcessError() error {
}
return ep.err
}
func (ep *ExpectProcess) Lines() []string {
ep.mu.Lock()
defer ep.mu.Unlock()
return ep.lines
}

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0
)

View File

@ -190,12 +190,6 @@ func URLStringsEqual(ctx context.Context, lg *zap.Logger, a []string, b []string
}
urlsB = append(urlsB, *u)
}
if lg == nil {
lg, _ = zap.NewProduction()
if lg == nil {
lg = zap.NewExample()
}
}
return urlsEqual(ctx, lg, urlsA, urlsB)
}

View File

@ -37,17 +37,8 @@ var (
defaultDialTimeout = 3 * time.Second
defaultBufferSize = 48 * 1024
defaultRetryInterval = 10 * time.Millisecond
defaultLogger *zap.Logger
)
func init() {
var err error
defaultLogger, err = zap.NewProduction()
if err != nil {
panic(err)
}
}
// Server defines proxy server layer that simulates common network faults:
// latency spikes and packet drop or corruption. The proxy overhead is very
// small overhead (<500μs per request). Please run tests to compute actual
@ -240,9 +231,6 @@ func NewServer(cfg ServerConfig) Server {
if s.retryInterval == 0 {
s.retryInterval = defaultRetryInterval
}
if s.lg == nil {
s.lg = defaultLogger
}
close(s.pauseAcceptc)
close(s.pauseTxc)

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.4
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -22,7 +22,7 @@ import (
"go.uber.org/zap"
)
func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions {
user := getUser(lg, tx, userName)
if user == nil {
return nil
@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
return false
}
func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if !ok {

View File

@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer func() {
tx.Unlock()
b.ForceCommit()
@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() {
}
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
as.commitRevision(tx)
tx.Unlock()
@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, username)
@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// CompareHashAndPassword is very expensive, so we use closures
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
func (as *authStore) Recover(be backend.Backend) {
enabled := false
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, r.Name)
@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, r.Name)
@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, r.Name)
@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, r.User)
@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
user := getUser(as.lg, tx, r.Name)
tx.Unlock()
@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,
func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
users := getAllUsers(as.lg, tx)
tx.Unlock()
@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
user := getUser(as.lg, tx, r.Name)
@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
var resp pb.AuthRoleGetResponse
@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
roles := getAllRoles(as.lg, tx)
tx.Unlock()
@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon
func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
role := getRole(as.lg, tx, r.Role)
@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
role := getRole(as.lg, tx, r.Role)
@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
role := getRole(as.lg, tx, r.Name)
@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
}
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
role := getRole(as.lg, tx, r.Name)
@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
return ErrAuthOldRevision
}
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx := as.be.ReadTx()
tx.Lock()
u := getUser(as.lg, tx, authInfo.Username)
tx.Unlock()
@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
return nil
}
func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
return user
}
func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User {
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
}
func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
if len(vs) == 0 {
return nil
@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
return role
}
func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role {
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
}
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Auth)
tx.UnsafeCreateBucket(buckets.AuthUsers)
@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
}
func getRevision(tx backend.BatchTx) uint64 {
func getRevision(tx backend.ReadTx) uint64 {
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {
func (as *authStore) HasRole(user, role string) bool {
tx := as.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
u := getUser(as.lg, tx, user)
tx.Unlock()

View File

@ -355,11 +355,10 @@ func (cfg *config) parse(arguments []string) error {
func (cfg *config) configFromCmdLine() error {
// user-specified logger is not setup yet, use this logger during flag parsing
lg, err := zap.NewProduction()
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
return err
}
verKey := "ETCD_VERSION"
if verVal := os.Getenv(verKey); verVal != "" {
// unset to avoid any possible side-effect.

View File

@ -27,6 +27,7 @@ import (
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil"
@ -63,7 +64,7 @@ func startEtcdOrProxyV2(args []string) {
if lg == nil {
var zapError error
// use this logger
lg, zapError = zap.NewProduction()
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if zapError != nil {
fmt.Printf("error creating zap logger %v", zapError)
os.Exit(1)
@ -463,6 +464,10 @@ func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {
}
func checkSupportArch() {
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
panic(err)
}
// to add a new platform, check https://github.com/etcd-io/website/blob/main/content/en/docs/next/op-guide/supported-platform.md
if runtime.GOARCH == "amd64" ||
runtime.GOARCH == "arm64" ||
@ -474,10 +479,10 @@ func checkSupportArch() {
// so unset here to not parse through flag
defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
fmt.Printf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set\n", env)
lg.Info("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", env))
return
}
fmt.Printf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set\n", runtime.GOARCH)
lg.Error("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", runtime.GOARCH))
os.Exit(1)
}

View File

@ -21,6 +21,7 @@ import (
"os"
"time"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/server/v3/proxy/tcpproxy"
"github.com/spf13/cobra"
@ -92,8 +93,7 @@ func stripSchema(eps []string) []string {
}
func startGateway(cmd *cobra.Command, args []string) {
var lg *zap.Logger
lg, err := zap.NewProduction()
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)

View File

@ -164,16 +164,14 @@ func newGRPCProxyStartCommand() *cobra.Command {
func startGRPCProxy(cmd *cobra.Command, args []string) {
checkArgs()
lcfg := logutil.DefaultZapLoggerConfig
lvl := zap.InfoLevel
if grpcProxyDebug {
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
lvl = zap.DebugLevel
grpc.EnableTracing = true
}
lg, err := lcfg.Build()
lg, err := logutil.CreateDefaultZapLogger(lvl)
if err != nil {
log.Fatal(err)
panic(err)
}
defer lg.Sync()

View File

@ -41,9 +41,6 @@ func Main(args []string) {
}
func notifySystemd(lg *zap.Logger) {
if lg == nil {
lg = zap.NewExample()
}
lg.Info("notifying init daemon")
_, err := daemon.SdNotify(false, daemon.SdNotifyReady)
if err != nil {

View File

@ -40,14 +40,16 @@ const (
// HandleMetricsHealth registers metrics and health handlers.
func HandleMetricsHealth(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathMetrics, promhttp.Handler())
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet) Health { return checkV2Health(lg, srv, excludedAlarms) }))
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { return checkV2Health(lg, srv, excludedAlarms) }))
}
// HandleMetricsHealthForV3 registers metrics and health handlers. it checks health by using v3 range request
// and its corresponding timeout.
func HandleMetricsHealthForV3(lg *zap.Logger, mux *http.ServeMux, srv *etcdserver.EtcdServer) {
mux.Handle(PathMetrics, promhttp.Handler())
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet) Health { return checkV3Health(lg, srv, excludedAlarms) }))
mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health {
return checkV3Health(lg, srv, excludedAlarms, serializable)
}))
}
// HandlePrometheus registers prometheus handler on '/metrics'.
@ -56,7 +58,7 @@ func HandlePrometheus(mux *http.ServeMux) {
}
// NewHealthHandler handles '/health' requests.
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet) Health) http.HandlerFunc {
func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, serializable bool) Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
@ -65,7 +67,12 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet) Health
return
}
excludedAlarms := getExcludedAlarms(r)
h := hfunc(excludedAlarms)
// Passing the query parameter "serializable=true" ensures that the
// health of the local etcd is checked vs the health of the cluster.
// This is useful for probes attempting to validate the liveness of
// the etcd process vs readiness of the cluster to serve requests.
serializableFlag := getSerializableFlag(r)
h := hfunc(excludedAlarms, serializableFlag)
defer func() {
if h.Health == "true" {
healthSuccess.Inc()
@ -128,9 +135,13 @@ func getExcludedAlarms(r *http.Request) (alarms AlarmSet) {
return alarms
}
func getSerializableFlag(r *http.Request) bool {
return r.URL.Query().Get("serializable") == "true"
}
// TODO: etcdserver.ErrNoLeader in health API
func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet) Health {
func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet, serializable bool) Health {
h := Health{}
h.Health = "true"
as := srv.Alarms()
@ -156,7 +167,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
}
}
if uint64(srv.Leader()) == raft.None {
if !serializable && uint64(srv.Leader()) == raft.None {
h.Health = "false"
h.Reason = "RAFT NO LEADER"
lg.Warn("serving /health false; no leader")
@ -166,7 +177,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
}
func checkV2Health(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSet) (h Health) {
if h = checkHealth(lg, srv, excludedAlarms); h.Health != "true" {
if h = checkHealth(lg, srv, excludedAlarms, false); h.Health != "true" {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@ -182,12 +193,12 @@ func checkV2Health(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms Alarm
return
}
func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms AlarmSet) (h Health) {
if h = checkHealth(lg, srv, excludedAlarms); h.Health != "true" {
func checkV3Health(lg *zap.Logger, srv *etcdserver.EtcdServer, excludedAlarms AlarmSet, serializable bool) (h Health) {
if h = checkHealth(lg, srv, excludedAlarms, serializable); h.Health != "true" {
return
}
ctx, cancel := context.WithTimeout(context.Background(), srv.Cfg.ReqTimeout())
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1})
_, err := srv.Range(ctx, &etcdserverpb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable})
cancel()
if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied {
h.Health = "false"

View File

@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
}
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
// from the v3 backend.
func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
lg.Info("Trimming membership information from the backend...")
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}
@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}
@ -316,7 +316,7 @@ func backendDowngradeKey() []byte {
func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)

View File

@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}
b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
b.BatchTx().Unlock()
@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
}
b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
b.BatchTx().Unlock()
@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error {
b := a.bg.Backend()
tx := b.BatchTx()
tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Alarm)
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
var m pb.AlarmMember

View File

@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,

View File

@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil

View File

@ -25,6 +25,7 @@ import (
type Backend interface {
BatchTx() backend.BatchTx
ReadTx() backend.ReadTx
}
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
@ -33,9 +34,18 @@ type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
ConsistentApplyingIndex() (uint64, uint64)
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
UnsafeConsistentIndex() uint64
// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64, term uint64)
// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
SetConsistentApplyingIndex(v uint64, term uint64)
// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
@ -55,6 +65,12 @@ type consistentIndex struct {
// The value is being persisted in the backend since v3.5.
term uint64
// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
applyingIndex uint64
applyingTerm uint64
// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
@ -74,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v, term := ReadConsistentIndex(ci.be.BatchTx())
v, term := ReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
v, term := unsafeReadConsistentIndex(ci.be.ReadTx())
ci.SetConsistentIndex(v, term)
return v
}
@ -87,7 +113,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
UnsafeUpdateConsistentIndex(tx, index, term)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -98,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
ci.SetConsistentIndex(0, 0)
}
func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
}
func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.applyingIndex, v)
atomic.StoreUint64(&ci.applyingTerm, term)
}
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
@ -107,12 +142,24 @@ type fakeConsistentIndex struct {
term uint64
}
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
}
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
return atomic.LoadUint64(&f.index)
}
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
@ -124,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Meta)
}
@ -154,22 +201,12 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
@ -182,8 +219,8 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64,
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
tx.LockOutsideApply()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term)
}

View File

@ -63,6 +63,58 @@ func TestConsistentIndex(t *testing.T) {
assert.Equal(t, r, index)
}
func TestConsistentIndexDecrease(t *testing.T) {
initIndex := uint64(100)
initTerm := uint64(10)
tcs := []struct {
name string
index uint64
term uint64
}{
{
name: "Decrease term",
index: initIndex + 1,
term: initTerm - 1,
},
{
name: "Decrease CI",
index: initIndex - 1,
term: initTerm + 1,
},
{
name: "Decrease CI and term",
index: initIndex - 1,
term: initTerm - 1,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
UnsafeCreateMetaBucket(tx)
UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
tx.Unlock()
be.ForceCommit()
be.Close()
be = backend.NewDefaultBackend(tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
tx = be.BatchTx()
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
assert.Equal(t, tc.index, ci.ConsistentIndex())
ci = NewConsistentIndex(be)
assert.Equal(t, tc.index, ci.ConsistentIndex())
})
}
}
func TestFakeConsistentIndex(t *testing.T) {
r := rand.Uint64()

View File

@ -27,6 +27,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")

View File

@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
})
}
// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
@ -1243,6 +1247,13 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
// We need to set the backend to consistIndex before recovering the lessor,
// because lessor.Recover will commit the boltDB transaction, accordingly it
// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
// Eventually the new consistent_index value coming from snapshot is overwritten
// by the old value.
s.consistIndex.SetBackend(newbe)
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
@ -1259,7 +1270,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
s.consistIndex.SetBackend(newbe)
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
// Closing old backend might block until all the txns
@ -2128,7 +2139,7 @@ func (s *EtcdServer) apply(
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
@ -2155,11 +2166,20 @@ func (s *EtcdServer) apply(
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
var ar *applyResult
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
if !applyV3Performed || (ar != nil && ar.err != nil) {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
}
s.lg.Debug("apply entry normal",
zap.Uint64("consistent-index", index),
@ -2201,12 +2221,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
id = raftReq.Header.ID
}
var ar *applyResult
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
applyV3Performed = true
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
}
@ -2258,6 +2278,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
// The txPostLock callback will not get called in this case,
// so we should set the consistent index directly.
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
}
return false, err
}
@ -2683,6 +2710,15 @@ func (s *EtcdServer) raftStatus() raft.Status {
return s.r.Node.Status()
}
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
size := be.Size()
sizeInUse := be.SizeInUse()

View File

@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}
assert.Equal(t, uint64(2), appliedi)
t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
rindex, _ := cindex.ReadConsistentIndex(be.ReadTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}
func realisticRaftNode(lg *zap.Logger) *raftNode {
@ -2039,3 +2036,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
}
s.sendC <- send
}
func TestWaitAppliedIndex(t *testing.T) {
cases := []struct {
name string
appliedIndex uint64
committedIndex uint64
action func(s *EtcdServer)
ExpectedError error
}{
{
name: "The applied Id is already equal to the commitId",
appliedIndex: 10,
committedIndex: 10,
action: func(s *EtcdServer) {
s.applyWait.Trigger(10)
},
ExpectedError: nil,
},
{
name: "The etcd server has already stopped",
appliedIndex: 10,
committedIndex: 12,
action: func(s *EtcdServer) {
s.stopping <- struct{}{}
},
ExpectedError: ErrStopped,
},
{
name: "Timed out waiting for the applied index",
appliedIndex: 10,
committedIndex: 12,
action: nil,
ExpectedError: ErrTimeoutWaitAppliedIndex,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &EtcdServer{
appliedIndex: tc.appliedIndex,
committedIndex: tc.committedIndex,
stopping: make(chan struct{}, 1),
applyWait: wait.NewTimeList(),
}
if tc.action != nil {
go tc.action(s)
}
err := s.waitAppliedIndex()
if err != tc.ExpectedError {
t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
}
})
}
}

View File

@ -45,6 +45,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond
// The timeout for the node to catch up its applied index, and is used in
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
applyTimeout = time.Second
)
type RaftKV interface {
@ -271,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
return resp.(*pb.LeaseGrantResponse), nil
}
func (s *EtcdServer) waitAppliedIndex() error {
select {
case <-s.ApplyWait():
case <-s.stopping:
return ErrStopped
case <-time.After(applyTimeout):
return ErrTimeoutWaitAppliedIndex
}
return nil
}
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
@ -280,26 +296,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
@ -315,7 +337,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.Leader() == s.ID() {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {

View File

@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jonboulle/clockwork v0.2.2
github.com/kr/text v0.2.0 // indirect
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5
@ -25,20 +25,20 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v2 v2.305.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/pkg/v3 v3.5.4
go.etcd.io/etcd/raft/v3 v3.5.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0

View File

@ -234,8 +234,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -343,8 +343,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -389,8 +389,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -430,16 +431,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=

View File

@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"sort"
"sync"
@ -796,18 +797,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
lpbs := unsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
@ -825,7 +820,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()
le.b.ForceCommit()
}
@ -858,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) {
panic("failed to marshal lease proto item")
}
b.BatchTx().Lock()
b.BatchTx().LockInsideApply()
b.BatchTx().UnsafePut(buckets.Lease, key, val)
b.BatchTx().Unlock()
}
@ -923,6 +917,30 @@ func int64ToBytes(n int64) []byte {
return bytes
}
func bytesToLeaseID(bytes []byte) int64 {
if len(bytes) != 8 {
panic(fmt.Errorf("lease ID must be 8-byte"))
}
return int64(binary.BigEndian.Uint64(bytes))
}
func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
ls := make([]*leasepb.Lease, 0)
err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error {
var lpb leasepb.Lease
err := lpb.Unmarshal(v)
if err != nil {
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
}
ls = append(ls, &lpb)
return nil
})
if err != nil {
panic(err)
}
return ls
}
// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
type FakeLessor struct{}

View File

@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
@ -27,9 +28,12 @@ import (
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -649,6 +653,93 @@ func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
}
}
func TestLeaseBackend(t *testing.T) {
tcs := []struct {
name string
setup func(tx backend.BatchTx)
want []*leasepb.Lease
}{
{
name: "Empty by default",
setup: func(tx backend.BatchTx) {},
want: []*leasepb.Lease{},
},
{
name: "Returns data put before",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
},
want: []*leasepb.Lease{
{
ID: -1,
TTL: 2,
},
},
},
{
name: "Skips deleted",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MinInt64,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MaxInt64,
TTL: 3,
})
tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1))
},
want: []*leasepb.Lease{
{
ID: math.MaxInt64,
TTL: 3,
},
{
ID: math.MinInt64, // bytes bigger than MaxInt64
TTL: 2,
},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
tc.setup(tx)
tx.Unlock()
be.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
leases := unsafeGetAllLeases(be2.ReadTx())
assert.Equal(t, tc.want, leases)
})
}
}
func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := int64ToBytes(lpb.ID)
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(buckets.Lease, key, val)
}
type fakeDeleter struct {
deleted []string
tx backend.BatchTx

View File

@ -68,6 +68,9 @@ type Backend interface {
Defrag() error
ForceCommit()
Close() error
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
}
type Snapshot interface {
@ -100,8 +103,9 @@ type backend struct {
// mlock prevents backend database file to be swapped
mlock bool
mu sync.RWMutex
db *bolt.DB
mu sync.RWMutex
bopts *bolt.Options
db *bolt.DB
batchInterval time.Duration
batchLimit int
@ -119,6 +123,9 @@ type backend struct {
hooks Hooks
// txPostLockInsideApplyHook is called each time right after locking the tx.
txPostLockInsideApplyHook func()
lg *zap.Logger
}
@ -185,7 +192,8 @@ func newBackend(bcfg BackendConfig) *backend {
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
bopts: bopts,
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
@ -229,6 +237,14 @@ func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
// It needs to lock the batchTx, because the periodic commit
// may be accessing the txPostLockInsideApplyHook at the moment.
b.batchTx.lock()
defer b.batchTx.Unlock()
b.txPostLockInsideApplyHook = hook
}
func (b *backend) ReadTx() ReadTx { return b.readTx }
// ConcurrentReadTx creates and returns a new ReadTx, which:
@ -438,7 +454,7 @@ func (b *backend) defrag() error {
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.Lock()
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.
@ -511,13 +527,7 @@ func (b *backend) defrag() error {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}
defragmentedBoltOptions := bolt.Options{}
if boltOpenOptions != nil {
defragmentedBoltOptions = *boltOpenOptions
}
defragmentedBoltOptions.Mlock = b.mlock
b.db, err = bolt.Open(dbp, 0600, &defragmentedBoltOptions)
b.db, err = bolt.Open(dbp, 0600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
}

View File

@ -122,7 +122,17 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
}
func TestBackendDefrag(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
bcfg := backend.DefaultBackendConfig()
// Make sure we change BackendFreelistType
// The goal is to verify that we restore config option after defrag.
if bcfg.BackendFreelistType == bolt.FreelistMapType {
bcfg.BackendFreelistType = bolt.FreelistArrayType
} else {
bcfg.BackendFreelistType = bolt.FreelistMapType
}
b, _ := betesting.NewTmpBackendFromCfg(t, bcfg)
defer betesting.Close(t, b)
tx := b.BatchTx()
@ -168,6 +178,10 @@ func TestBackendDefrag(t *testing.T) {
if nsize >= size {
t.Errorf("new size = %v, want < %d", nsize, size)
}
db := backend.DbFromBackendForTest(b)
if db.FreelistType != bcfg.BackendFreelistType {
t.Errorf("db FreelistType = [%v], want [%v]", db.FreelistType, bcfg.BackendFreelistType)
}
// try put more keys after shrink.
tx = b.BatchTx()

View File

@ -53,6 +53,8 @@ type BatchTx interface {
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
LockInsideApply()
LockOutsideApply()
}
type batchTx struct {
@ -63,10 +65,34 @@ type batchTx struct {
pending int
}
// Lock is supposed to be called only by the unit test.
func (t *batchTx) Lock() {
ValidateCalledInsideUnittest(t.backend.lg)
t.lock()
}
func (t *batchTx) lock() {
t.Mutex.Lock()
}
func (t *batchTx) LockInsideApply() {
t.lock()
if t.backend.txPostLockInsideApplyHook != nil {
// The callers of some methods (i.e., (*RaftCluster).AddMember)
// can be coming from both InsideApply and OutsideApply, but the
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
// So we should check the txPostLockInsideApplyHook before validating
// the callstack.
ValidateCalledInsideApply(t.backend.lg)
t.backend.txPostLockInsideApplyHook()
}
}
func (t *batchTx) LockOutsideApply() {
ValidateCalledOutSideApply(t.backend.lg)
t.lock()
}
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
@ -214,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
t.lock()
t.commit(false)
t.Unlock()
}
// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
t.lock()
t.commit(true)
t.Unlock()
}
@ -291,13 +317,13 @@ func (t *batchTxBuffered) Unlock() {
}
func (t *batchTxBuffered) Commit() {
t.Lock()
t.lock()
t.commit(false)
t.Unlock()
}
func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
t.lock()
t.commit(true)
t.Unlock()
}

View File

@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) {
// Empty commit.
tx.Commit()
write(tx, []byte("foo"), []byte("bar"))
assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
tx.Commit()
assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")

View File

@ -0,0 +1,70 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"os"
"runtime/debug"
"strings"
"go.uber.org/zap"
)
const (
ENV_VERIFY = "ETCD_VERIFY"
ENV_VERIFY_ALL_VALUE = "all"
ENV_VERIFY_LOCK = "lock"
)
func ValidateCalledInsideApply(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if !insideApply() {
lg.Panic("Called outside of APPLY!", zap.Stack("stacktrace"))
}
}
func ValidateCalledOutSideApply(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if insideApply() {
lg.Panic("Called inside of APPLY!", zap.Stack("stacktrace"))
}
}
func ValidateCalledInsideUnittest(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if !insideUnittest() {
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
}
}
func verifyLockEnabled() bool {
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
}
func insideApply() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, ".applyEntries")
}
func insideUnittest() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
}

View File

@ -0,0 +1,111 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend_test
import (
"os"
"testing"
"time"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
func TestLockVerify(t *testing.T) {
tcs := []struct {
name string
insideApply bool
lock func(tx backend.BatchTx)
txPostLockInsideApplyHook func()
expectPanic bool
}{
{
name: "call lockInsideApply from inside apply",
insideApply: true,
lock: lockInsideApply,
expectPanic: false,
},
{
name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)",
insideApply: false,
lock: lockInsideApply,
expectPanic: false,
},
{
name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)",
insideApply: false,
lock: lockInsideApply,
txPostLockInsideApplyHook: func() {},
expectPanic: true,
},
{
name: "call lockOutsideApply from outside apply",
insideApply: false,
lock: lockOutsideApply,
expectPanic: false,
},
{
name: "call lockOutsideApply from inside apply",
insideApply: true,
lock: lockOutsideApply,
expectPanic: true,
},
{
name: "call Lock from unit test",
insideApply: false,
lock: lockFromUT,
expectPanic: false,
},
}
env := os.Getenv("ETCD_VERIFY")
os.Setenv("ETCD_VERIFY", "lock")
defer func() {
os.Setenv("ETCD_VERIFY", env)
}()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook)
hasPaniced := handlePanic(func() {
if tc.insideApply {
applyEntries(be, tc.lock)
} else {
tc.lock(be.BatchTx())
}
}) != nil
if hasPaniced != tc.expectPanic {
t.Errorf("%v != %v", hasPaniced, tc.expectPanic)
}
})
}
}
func handlePanic(f func()) (result interface{}) {
defer func() {
result = recover()
}()
f()
return result
}
func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) {
f(be.BatchTx())
}
func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() }
func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() }
func lockFromUT(tx backend.BatchTx) { tx.Lock() }

View File

@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
}
tx := s.b.BatchTx()
tx.Lock()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(buckets.Meta)
tx.Unlock()
@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
revToBytes(revision{main: rev}, rbytes)
tx := s.b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.Unlock()
// ensure that desired compaction is persisted
@ -334,7 +334,7 @@ func (s *store) restore() error {
keyToLease := make(map[string]lease.LeaseID)
// restore index
tx := s.b.BatchTx()
tx := s.b.ReadTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)

View File

@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
start := time.Now()
tx := s.b.BatchTx()
tx.Lock()
tx.LockOutsideApply()
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
for _, key := range keys {
rev = bytesToRev(key)

View File

@ -871,6 +871,8 @@ type fakeBatchTx struct {
rangeRespc chan rangeResp
}
func (b *fakeBatchTx) LockInsideApply() {}
func (b *fakeBatchTx) LockOutsideApply() {}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
@ -912,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
type indexGetResp struct {
rev revision

View File

@ -78,7 +78,7 @@ type storeTxnWrite struct {
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,

View File

@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
panic(fmt.Errorf("cannot marshal event: %v", err))
}
be.BatchTx().Lock()
be.BatchTx().LockOutsideApply()
be.BatchTx().UnsafePut(buckets.Key, ibytes, d)
be.BatchTx().Unlock()
}

View File

@ -31,7 +31,7 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
if lg == nil {
lg = zap.NewNop()
}
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet) etcdhttp.Health { return checkHealth(c) }))
mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkHealth(c) }))
}
// HandleProxyHealth registers health handler on '/proxy/health'.
@ -39,7 +39,7 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) {
if lg == nil {
lg = zap.NewNop()
}
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet) etcdhttp.Health { return checkProxyHealth(c) }))
mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) }))
}
func checkHealth(c *clientv3.Client) etcdhttp.Health {

View File

@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) {
}
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
tx := be.BatchTx()
index, term := cindex.ReadConsistentIndex(tx)
index, term := cindex.ReadConsistentIndex(be.ReadTx())
if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}

View File

@ -43,6 +43,7 @@ set -o pipefail
# e.g. add/update missing dependencies. Such divergences should be
# detected and trigger a failure that needs explicit developer's action.
export GOFLAGS=-mod=readonly
export ETCD_VERIFY=all
source ./scripts/test_lib.sh
source ./build.sh

View File

@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
v2deprecation string
rollingStart bool
logLevel string
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -315,6 +316,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--v2-deprecation", cfg.v2deprecation)
}
if cfg.logLevel != "" {
args = append(args, "--log-level", cfg.logLevel)
}
etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
execPath: cfg.execPath,

View File

@ -155,15 +155,20 @@ func memberListWithHexTest(cx ctlCtx) {
if num == 0 {
cx.t.Fatal("member number is 0")
}
if resp.Header.RaftTerm != hexResp.Header.RaftTerm {
cx.t.Fatalf("Unexpected raft_term, expected %d, got %d", resp.Header.RaftTerm, hexResp.Header.RaftTerm)
}
for i := 0; i < num; i++ {
if resp.Members[i].Name != hexResp.Members[i].Name {
cx.t.Fatalf("member name,expected %v,got %v", resp.Members[i].Name, hexResp.Members[i].Name)
cx.t.Fatalf("Unexpected member name,expected %v, got %v", resp.Members[i].Name, hexResp.Members[i].Name)
}
if !reflect.DeepEqual(resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs) {
cx.t.Fatalf("member peerURLs,expected %v,got %v", resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs)
cx.t.Fatalf("Unexpected member peerURLs, expected %v, got %v", resp.Members[i].PeerURLs, hexResp.Members[i].PeerURLs)
}
if !reflect.DeepEqual(resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs) {
cx.t.Fatalf("member clientURLS,expected %v,got %v", resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs)
cx.t.Fatalf("Unexpected member clientURLS, expected %v, got %v", resp.Members[i].ClientURLs, hexResp.Members[i].ClientURLs)
}
}
}

View File

@ -48,6 +48,8 @@ type etcdProcess interface {
type logsExpect interface {
Expect(string) (string, error)
Lines() []string
LineCount() int
}
type etcdServerProcess struct {

View File

@ -60,21 +60,15 @@ func spawnWithExpectLines(args []string, envVars map[string]string, xs ...string
// process until either stdout or stderr contains
// the expected string
var (
lines []string
lineFunc = func(txt string) bool { return true }
lines []string
)
for _, txt := range xs {
for {
l, lerr := proc.ExpectFunc(lineFunc)
if lerr != nil {
proc.Close()
return nil, fmt.Errorf("%v %v (expected %q, got %q). Try EXPECT_DEBUG=TRUE", args, lerr, txt, lines)
}
lines = append(lines, l)
if strings.Contains(l, txt) {
break
}
l, lerr := proc.Expect(txt)
if lerr != nil {
proc.Close()
return nil, fmt.Errorf("%v %v (expected %q, got %q). Try EXPECT_DEBUG=TRUE", args, lerr, txt, lines)
}
lines = append(lines, l)
}
perr := proc.Close()
l := proc.LineCount()

View File

@ -0,0 +1,76 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !cov
// +build !cov
package e2e
import (
"encoding/json"
"testing"
"time"
)
func TestServerJsonLogging(t *testing.T) {
BeforeTest(t)
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 1,
initialToken: "new",
logLevel: "debug",
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
logs := epc.procs[0].Logs()
time.Sleep(time.Second)
if err = epc.Close(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err)
}
var entry logEntry
lines := logs.Lines()
if len(lines) == 0 {
t.Errorf("Expected at least one log line")
}
for _, line := range lines {
err := json.Unmarshal([]byte(line), &entry)
if err != nil {
t.Errorf("Failed to parse log line as json, err: %q, line: %s", err, line)
continue
}
if entry.Level == "" {
t.Errorf(`Missing "level" key, line: %s`, line)
}
if entry.Timestamp == "" {
t.Errorf(`Missing "ts" key, line: %s`, line)
}
if _, err := time.Parse("2006-01-02T15:04:05.000Z0700", entry.Timestamp); entry.Timestamp != "" && err != nil {
t.Errorf(`Unexpected "ts" key format, err: %s`, err)
}
if entry.Caller == "" {
t.Errorf(`Missing "caller" key, line: %s`, line)
}
if entry.Message == "" {
t.Errorf(`Missing "message" key, line: %s`, line)
}
}
}
type logEntry struct {
Level string `json:"level"`
Timestamp string `json:"ts"`
Caller string `json:"caller"`
Message string `json:"msg"`
}

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/tests/v3/functional/agent"
"go.uber.org/zap"
@ -27,7 +28,7 @@ var logger *zap.Logger
func init() {
var err error
logger, err = zap.NewProduction()
logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
panic(err)
}

View File

@ -28,6 +28,7 @@ import (
"syscall"
"time"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.uber.org/zap"
@ -76,7 +77,11 @@ $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
To: url.URL{Scheme: "tcp", Host: to},
}
if verbose {
cfg.Logger = zap.NewExample()
var err error
cfg.Logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
panic(err)
}
}
p := proxy.NewServer(cfg)
<-p.Ready()

View File

@ -19,6 +19,7 @@ import (
"flag"
_ "github.com/etcd-io/gofail/runtime"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/tests/v3/functional/tester"
"go.uber.org/zap"
)
@ -27,7 +28,7 @@ var logger *zap.Logger
func init() {
var err error
logger, err = zap.NewProduction()
logger, err = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
panic(err)
}

View File

@ -19,6 +19,7 @@ import (
"sort"
"testing"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/tests/v3/functional/rpcpb"
"go.uber.org/zap"
@ -256,7 +257,7 @@ func Test_read(t *testing.T) {
},
}
logger, err := zap.NewProduction()
logger, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
t.Fatal(err)
}

View File

@ -22,22 +22,22 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_golang v1.11.1
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/pkg/v3 v3.5.4
go.etcd.io/etcd/client/v2 v2.305.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/etcdutl/v3 v3.5.4
go.etcd.io/etcd/pkg/v3 v3.5.4
go.etcd.io/etcd/raft/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0

View File

@ -239,8 +239,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -348,8 +348,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -394,8 +394,9 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -436,16 +437,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=

View File

@ -0,0 +1,103 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !cluster_proxy
// +build !cluster_proxy
package clientv3test
import (
"context"
"reflect"
"testing"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/mirror"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
)
func TestMirrorSync_Authenticated(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
initialClient := clus.Client(0)
// Create a user to run the mirror process that only has access to /syncpath
initialClient.RoleAdd(context.Background(), "syncer")
initialClient.RoleGrantPermission(context.Background(), "syncer", "/syncpath", clientv3.GetPrefixRangeEnd("/syncpath"), clientv3.PermissionType(clientv3.PermReadWrite))
initialClient.UserAdd(context.Background(), "syncer", "syncfoo")
initialClient.UserGrantRole(context.Background(), "syncer", "syncer")
// Seed /syncpath with some initial data
_, err := initialClient.KV.Put(context.TODO(), "/syncpath/foo", "bar")
if err != nil {
t.Fatal(err)
}
// Require authentication
authSetupRoot(t, initialClient.Auth)
// Create a client as the `syncer` user.
cfg := clientv3.Config{
Endpoints: initialClient.Endpoints(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Username: "syncer",
Password: "syncfoo",
}
syncClient, err := integration.NewClient(t, cfg)
if err != nil {
t.Fatal(err)
}
defer syncClient.Close()
// Now run the sync process, create changes, and get the initial sync state
syncer := mirror.NewSyncer(syncClient, "/syncpath", 0)
gch, ech := syncer.SyncBase(context.TODO())
wkvs := []*mvccpb.KeyValue{{Key: []byte("/syncpath/foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}
for g := range gch {
if !reflect.DeepEqual(g.Kvs, wkvs) {
t.Fatalf("kv = %v, want %v", g.Kvs, wkvs)
}
}
for e := range ech {
t.Fatalf("unexpected error %v", e)
}
// Start a continuous sync
wch := syncer.SyncUpdates(context.TODO())
// Update state
_, err = syncClient.KV.Put(context.TODO(), "/syncpath/foo", "baz")
if err != nil {
t.Fatal(err)
}
// Wait for the updated state to sync
select {
case r := <-wch:
wkv := &mvccpb.KeyValue{Key: []byte("/syncpath/foo"), Value: []byte("baz"), CreateRevision: 2, ModRevision: 3, Version: 2}
if !reflect.DeepEqual(r.Events[0].Kv, wkv) {
t.Fatalf("kv = %v, want %v", r.Events[0].Kv, wkv)
}
case <-time.After(time.Second):
t.Fatal("failed to receive update in one second")
}
}

View File

@ -170,6 +170,7 @@ type ClusterConfig struct {
LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}
type cluster struct {
@ -332,6 +333,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
@ -635,6 +637,7 @@ type memberConfig struct {
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}
// mustNewMember return an inited member with the given name. If peerTLS is
@ -737,6 +740,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
m.InitialCorruptCheck = true
if mcfg.CorruptCheckTime > time.Duration(0) {
m.CorruptCheckTime = mcfg.CorruptCheckTime
}
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
m.V2Deprecation = config.V2_DEPR_DEFAULT
@ -1448,7 +1454,7 @@ func (c *ClusterV3) Client(i int) *clientv3.Client {
func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {
if c.clusterClient == nil {
endpoints := []string{}
var endpoints []string
for _, m := range c.Members {
endpoints = append(endpoints, m.grpcURL)
}

View File

@ -15,14 +15,14 @@
package integration_test
import (
"testing"
"time"
"go.etcd.io/etcd/tests/v3/integration"
"testing"
)
func TestBeforeTestWithoutLeakDetection(t *testing.T) {
integration.BeforeTest(t, integration.WithoutGoLeakDetection(), integration.WithoutSkipInShort())
// Intentional leak that should get ignored
go time.Sleep(2 * time.Second)
go func() {
}()
}

View File

@ -16,6 +16,7 @@ package integration
import (
"context"
"encoding/binary"
"os"
"path/filepath"
"sync"
@ -25,8 +26,10 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap/zaptest"
)
@ -228,3 +231,119 @@ func TestV3CorruptAlarm(t *testing.T) {
}
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
}
func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{
CorruptCheckTime: time.Second,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
})
defer clus.Terminate(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
if err != nil {
t.Errorf("could not create lease 1 (%v)", err)
}
if lresp.ID != 1 {
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
}
putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
// Trigger snapshot from the leader to new member
for i := 0; i < 15; i++ {
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
}
}
clus.RemoveMember(t, uint64(clus.Members[2].ID()))
oldMemberClient := clus.Client(2)
if err := oldMemberClient.Close(); err != nil {
t.Fatal(err)
}
clus.AddMember(t)
// Wait for new member to catch up
newMemberClient, err := clus.NewClientV3(2)
if err != nil {
t.Fatal(err)
}
WaitClientV3(t, newMemberClient)
clus.clients[2] = newMemberClient
// Corrupt member 2 by modifying backend lease bucket offline.
clus.Members[2].Stop(t)
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
bcfg := backend.DefaultBackendConfig()
bcfg.Path = fp
bcfg.Logger = zaptest.NewLogger(t)
be := backend.New(bcfg)
tx := be.BatchTx()
tx.UnsafeDelete(buckets.Lease, leaseIdToBytes(1))
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
mustUnsafePutLease(tx, &lpb)
tx.Commit()
if err := be.Close(); err != nil {
t.Fatal(err)
}
if err := clus.Members[2].Restart(t); err != nil {
t.Fatal(err)
}
clus.Members[1].WaitOK(t)
clus.Members[2].WaitOK(t)
// Revoke lease should remove key except the member with corruption
_, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
if err != nil {
t.Fatal(err)
}
resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo")
if err0 != nil {
t.Fatal(err0)
}
resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo")
if err1 != nil {
t.Fatal(err1)
}
if resp0.Header.Revision == resp1.Header.Revision {
t.Fatalf("matching Revision values")
}
// Wait for CorruptCheckTime
time.Sleep(time.Second)
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
if perr != nil {
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
} else {
return
}
}
}
func leaseIdToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}
func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := leaseIdToBytes(lpb.ID)
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(buckets.Lease, key, val)
}

View File

@ -46,6 +46,33 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
}
}
// TestV3AuthEmptyUserPut ensures that a put with an empty user will return an empty user error,
// and the consistent_index should be moved forward even the apply-->Put fails.
func TestV3AuthEmptyUserPut(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{
Size: 1,
SnapshotCount: 3,
})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
api := toGRPC(clus.Client(0))
authSetupRoot(t, api.Auth)
// The SnapshotCount is 3, so there must be at least 3 new snapshot files being created.
// The VERIFY logic will check whether the consistent_index >= last snapshot index on
// cluster terminating.
for i := 0; i < 10; i++ {
_, err := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
}
}
}
// TestV3AuthTokenWithDisable tests that auth won't crash if
// given a valid token when authentication is disabled
func TestV3AuthTokenWithDisable(t *testing.T) {

View File

@ -16,7 +16,9 @@ package integration
import (
"context"
"errors"
"fmt"
"math"
"testing"
"time"
@ -141,6 +143,91 @@ func TestV3LeaseGrantByID(t *testing.T) {
}
}
// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend.
//
// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease
// and delete associated keys to ensure kv store data consistency
//
// It ensures issue 12535 is fixed by PR 13676
func TestV3LeaseNegativeID(t *testing.T) {
tcs := []struct {
leaseID int64
k []byte
v []byte
}{
{
leaseID: -1, // int64 -1 is 2^64 -1 in uint64
k: []byte("foo"),
v: []byte("bar"),
},
{
leaseID: math.MaxInt64,
k: []byte("bar"),
v: []byte("foo"),
},
{
leaseID: math.MinInt64,
k: []byte("hello"),
v: []byte("world"),
},
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc := clus.RandClient()
lresp, err := toGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300})
if err != nil {
t.Errorf("could not create lease %d (%v)", tc.leaseID, err)
}
if lresp.ID != tc.leaseID {
t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID)
}
putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID}
_, err = toGRPC(cc).KV.Put(ctx, putr)
if err != nil {
t.Errorf("couldn't put key (%v)", err)
}
// wait for backend Commit
time.Sleep(100 * time.Millisecond)
// restore lessor from db file
clus.Members[2].Stop(t)
if err := clus.Members[2].Restart(t); err != nil {
t.Fatal(err)
}
// revoke lease should remove key
WaitClientV3(t, clus.Client(2))
_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID})
if err != nil {
t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err)
}
var revision int64
for i := range clus.Members {
getr := &pb.RangeRequest{Key: tc.k}
getresp, err := toGRPC(clus.Client(i)).KV.Range(ctx, getr)
if err != nil {
t.Fatal(err)
}
if revision == 0 {
revision = getresp.Header.Revision
}
if revision != getresp.Header.Revision {
t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision)
}
if len(getresp.Kvs) != 0 {
t.Errorf("lease removed but key remains")
}
}
})
}
}
// TestV3LeaseExpire ensures a key is deleted once a key expires.
func TestV3LeaseExpire(t *testing.T) {
BeforeTest(t)
@ -412,17 +499,31 @@ func TestV3LeaseLeases(t *testing.T) {
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
testLeaseStress(t, stressLeaseRenew, false)
}
// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseRenewStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseRenew, true)
}
// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
testLeaseStress(t, stressLeaseTimeToLive, false)
}
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive, true)
}
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
@ -431,13 +532,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
defer cancel()
errc := make(chan error)
for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
if useClusterClient {
for i := 0; i < 300; i++ {
clusterClient, err := clus.ClusterClient()
if err != nil {
t.Fatal(err)
}
go func(i int) { errc <- stresser(ctx, toGRPC(clusterClient).Lease) }(i)
}
} else {
for i := 0; i < 100; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}
}
for i := 0; i < 90; i++ {
for i := 0; i < 300; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
@ -468,7 +579,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
continue
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
return errors.New("TTL shouldn't be 0 so soon")
}
}
return nil