Compare commits

...

46 Commits

Author SHA1 Message Date
cecbe35ce0 version: bump up to 3.5.6 2022-11-21 15:54:14 +01:00
d0424a7bf1 Merge pull request #14816 from serathius/trim-v3.5
[3.5] trim build path
2022-11-21 15:46:06 +01:00
1a9742c9c4 release: build with consistent paths
This changes the builds to always add -trimpath which removes specific
build time paths from the binary (like current directories etc).

Improves build reproducability to make the final binary independent from
the specific build path.

Lastly, when stripping debug symbols, also add -w to strip DWARF symbols
as well which aren't needed in that case either.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
2022-11-21 15:19:51 +01:00
7ccca083eb Merge pull request #14799 from serathius/fix-client-fileutil-log
[3.5] client/pkg/fileutil: add missing logger to {Create,Touch}DirAll
2022-11-17 15:39:42 +01:00
c91978077b client/pkg/fileutil: add missing logger to {Create,Touch}DirAll
Also populate it to every invocation.

Signed-off-by: WangXiaoxiao <1141195807@qq.com>
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
2022-11-17 14:08:30 +01:00
b2821631aa Merge pull request #14790 from ahrtr/auth_3.5_20221117
[3.5] clientv3: do not refresh token when users use CommonName based authentication
2022-11-17 10:16:42 +01:00
4097c24783 test: add test case to cover the CommonName based authentication
Refer to https://github.com/etcd-io/etcd/issues/14764

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-11-17 09:10:49 +08:00
9849fa7c66 test: add certificate with root CommonName
Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-11-17 05:56:43 +08:00
69aace20c8 clientv3: do not refresh token when using TLS CommonName based authentication
When users use the TLS CommonName based authentication, the
authTokenBundle is always nil. But it's possible for the clients
to get `rpctypes.ErrAuthOldRevision` response when the clients
concurrently modify auth data (e.g, addUser, deleteUser etc.).
In this case, there is no need to refresh the token; instead the
clients just need to retry the operations (e.g. Put, Delete etc).

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-11-17 05:54:52 +08:00
5f387e6b7d Merge pull request #14733 from ahrtr/rev_inconsistency_3.5
[3.5] etcdserver: call the OnPreCommitUnsafe in unsafeCommit
2022-11-14 17:54:22 +08:00
563713e128 etcdserver: call the OnPreCommitUnsafe in unsafeCommit
`unsafeCommit` is called by both `(*batchTxBuffered) commit` and
`(*backend) defrag`. When users perform the defragmentation
operation, etcd doesn't update the consistent index. If etcd
crashes(e.g. panicking) in the process for whatever reason, then
etcd replays the WAL entries starting from the latest snapshot,
accordingly it may re-apply entries which might have already been
applied, eventually the revision isn't consistent with other members.

Refer to discussion in https://github.com/etcd-io/etcd/pull/14685

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-11-11 17:35:26 +08:00
c2378be1b5 Merge pull request #13748 from kkkkun/add-warning-for-del
add warning message when delete to release-3.5
2022-11-05 16:51:01 +08:00
6797856841 add range flag for delete in etcdctl
Signed-off-by: kkkkun <scuzk373x@gmail.com>
2022-11-05 14:33:37 +08:00
cc6a082f9e Merge pull request #14658 from ahrtr/double_barrier_3.5
[3.5] clientv3: fix the implementation of double barrier
2022-11-02 23:16:11 +09:00
27707209ae Merge pull request #14676 from cenkalti/release-3.5
server: add more context to panic message
2022-11-02 07:56:06 +08:00
be4adc0c55 server: add more context to panic message
Signed-off-by: Cenk Alti <cenkalti@gmail.com>
2022-11-01 19:02:32 -04:00
8902fe9246 Merge pull request #14662 from falser101/release-3.5
[3.5] fix: close maintenance conn
2022-10-31 17:49:19 +08:00
45e31f6c80 fix:close conn
Signed-off-by: jianfei.zhang <jianfei.zhang@daocloud.io>
2022-10-31 16:00:58 +08:00
8e26a1fff1 clientv3: fix the design & implementation of double barrier
Check the client count before creating the ephemeral key, do not
create the key if there are already too many clients. Check the
count after creating the key again, if the total kvs is bigger
than the expected count, then check the rev of the current key,
and take action accordingly based on its rev. If its rev is in
the first ${count}, then it's valid client, otherwise, it should
fail.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-10-31 08:33:27 +08:00
0a0f0e3617 Merge pull request #14656 from ahrtr/test_dynamical_add_member
test: added e2e test case for issue 14571: etcd doesn't load auth info when recovering from a snapshot
2022-10-30 17:54:47 +09:00
bd7405a52e test: added e2e test case for issue 14571: etcd doesn't load auth info when recovering from a snapshot
Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-10-30 14:49:13 +08:00
17cb291f15 Merge pull request #14648 from mitake/test-authrecover-3.5
[3.5] server: refresh auth info when etcd recovers from a snapshot
2022-10-29 13:43:42 +08:00
1e96e0be38 etcdserver: call refreshRangePermCache on Recover() in AuthStore. #14574
Signed-off-by: Oleg Guba <oleg@dropbox.com>
Signed-off-by: Hitoshi Mitake <h.mitake@gmail.com>
2022-10-29 13:56:08 +09:00
efb9480b96 server: add a unit test case for authStore.Reocver() with empty rangePermCache
Signed-off-by: Hitoshi Mitake <h.mitake@gmail.com>
2022-10-29 13:26:31 +09:00
7cd9e5a338 Merge pull request #14593 from ZoeShaw101/fix-watch-test-panic-3.5
Backport #14591 to 3.5.
2022-10-16 19:33:26 +08:00
d78f6f7f14 Backport #14591 to 3.5.
Signed-off-by: 王霄霄 <1141195807@qq.com>
2022-10-16 18:52:57 +08:00
ec6f0a74ba Merge pull request #14500 from dusk125/release-3.5
Release-3.5: server/etcdmain: add configurable cipher list to gRPC proxy listener
2022-10-16 06:35:17 +08:00
62169d12eb Merge pull request #14582 from tomari/tomari/watch-backoff-for-3.5
[3.5] client/v3: Add backoff before retry when watch stream returns unavailable
2022-10-13 07:02:22 +08:00
d3da22fb1f client/v3: Add backoff before retry when watch stream returns unavailable
The client retries connection without backoff when the server is gone
after the watch stream is established. This results in high CPU usage
in the client process. This change introduces backoff when the stream is
failed and unavailable.

Signed-off-by: Hisanobu Tomari <posco.grubb@gmail.com>
2022-10-13 05:26:02 +09:00
acc7463fb2 Merge pull request #13861 from mrueg/rel3.5-fix-make2
[Release-3.5] Makefile: additional logic fix / Update Ubuntu base
2022-10-13 02:17:42 +08:00
2fb9be6f7d Merge pull request #14578 from ahrtr/wal_log_3.5
[3.5] etcdserver: added more debug log for the purgeFile goroutine
2022-10-13 02:07:24 +08:00
f6c4c84da3 etcdserver: added more debug log for the purgeFile goroutine
Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-10-12 19:28:32 +08:00
3afd0735e0 Merge pull request #14573 from pchan/automated-cherry-pick
Automated cherry pick of #13224 #14572
2022-10-12 09:39:05 +08:00
e712234a1a netutil: make a raw URL comparison part of the urlsEqual function
Signed-off-by: Prasad Chandrasekaran <prasadc@vmware.com>
2022-10-11 16:58:56 +05:30
3e195ba473 Apply suggestions from code review
Co-authored-by: Lili Cosic <cosiclili@gmail.com>
Signed-off-by: Prasad Chandrasekaran <prasadc@vmware.com>
2022-10-11 16:58:56 +05:30
25ef9b6f46 netutil: add url comparison without resolver to URLStringsEqual
If one of the nodes in the cluster has lost a dns record,
restarting the second node will break it.
This PR makes an attempt to add a comparison without using a resolver,
which allows to protect cluster from dns errors and does not break
the current logic of comparing urls in the URLStringsEqual function.
You can read more in the issue #7798

Fixes #7798

Signed-off-by: Prasad Chandrasekaran <prasadc@vmware.com>
2022-10-11 16:58:56 +05:30
5ff0d7fe26 tests/Dockerfile: Switch to ubuntu 22.04 base
ubuntu 20.10 is EOL and fails with
E: The repository 'http://security.ubuntu.com/ubuntu groovy-security Release' does not have a Release file.

Signed-off-by: Manuel Rüger <manuel@rueg.eu>
2022-10-10 22:34:56 +02:00
dce3fdbeb1 Makefile: Additional logic fix
Signed-off-by: Manuel Rüger <manuel@rueg.eu>
2022-10-10 22:30:49 +02:00
07c7a98371 Merge pull request #14563 from kafuu-chino/3.5-backport-14296
*: avoid closing a watch with ID 0 incorrectly
2022-10-09 23:59:36 +09:00
dd983c662b *: avoid closing a watch with ID 0 incorrectly
Signed-off-by: Kafuu Chino <KafuuChinoQ@gmail.com>

add test

1

1

1

1

1

1
2022-10-08 20:06:19 +08:00
5daf35bb4a Merge pull request #14547 from mitake/3.5-backport-14322
Backport PR 14322 to release-3.5
2022-10-04 06:37:46 +08:00
528dd82be9 tests: a test case for watch with auth token expiration
Signed-off-by: Hitoshi Mitake <h.mitake@gmail.com>
2022-10-03 23:08:23 +09:00
7b568f23ab *: handle auth invalid token and old revision errors in watch
Signed-off-by: Hitoshi Mitake <h.mitake@gmail.com>
2022-10-03 23:00:13 +09:00
db55011d7c server/etcdmain: add configurable cipher list to gRPC proxy listener
Signed-off-by: Allen Ray <alray@redhat.com>
2022-09-29 11:41:57 -04:00
89d0fc49fc Merge pull request #14489 from dims/Haimantika-replacejwtgo-in-release-3.5
Replace github.com/form3tech-oss/jwt-go with https://github.com/golang-jwt/jwt
2022-09-20 10:19:02 +08:00
653d6e18c3 Replace github.com/form3tech-oss/jwt-go with https://github.com/golang-jwt/jwt/v4
Signed-off-by: haimantika mitra <haimantikamitra@gmail.com>
Signed-off-by: Davanum Srinivas <davanum@gmail.com>
2022-09-19 15:35:08 -04:00
65 changed files with 1054 additions and 223 deletions

View File

@ -162,7 +162,7 @@ test-full:
PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
ensure-docker-test-image-exists:
make pull-docker-test || echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test
make pull-docker-test || ( echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test )
docker-test: ensure-docker-test-image-exists
$(info GO_VERSION: $(GO_VERSION))

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.5.5"
Version = "3.5.6"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@ -134,15 +134,6 @@
}
]
},
{
"project": "github.com/form3tech-oss/jwt-go",
"licenses": [
{
"type": "MIT License",
"confidence": 0.9891304347826086
}
]
},
{
"project": "github.com/getsentry/raven-go",
"licenses": [
@ -161,6 +152,15 @@
}
]
},
{
"project": "github.com/golang-jwt/jwt/v4",
"licenses": [
{
"type": "MIT License",
"confidence": 0.9891304347826086
}
]
},
{
"project": "github.com/golang/groupcache/lru",
"licenses": [

View File

@ -42,6 +42,7 @@ etcd_build() {
# Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK
# shellcheck disable=SC2086
run env "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
-trimpath \
-installsuffix=cgo \
"-ldflags=${GO_LDFLAGS[*]}" \
-o="../${out}/etcd" . || return 2
@ -52,6 +53,7 @@ etcd_build() {
(
cd ./etcdutl
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
-trimpath \
-installsuffix=cgo \
"-ldflags=${GO_LDFLAGS[*]}" \
-o="../${out}/etcdutl" . || return 2
@ -62,6 +64,7 @@ etcd_build() {
(
cd ./etcdctl
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
-trimpath \
-installsuffix=cgo \
"-ldflags=${GO_LDFLAGS[*]}" \
-o="../${out}/etcdctl" . || return 2
@ -92,6 +95,7 @@ tools_build() {
run rm -f "${out}/${tool}"
# shellcheck disable=SC2086
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} \
-trimpath \
-installsuffix=cgo \
"-ldflags='${GO_LDFLAGS[*]}'" \
-o="${out}/${tool}" "./${tool}" || return 2

View File

@ -44,16 +44,12 @@ func IsDirWriteable(dir string) error {
// TouchDirAll is similar to os.MkdirAll. It creates directories with 0700 permission if any directory
// does not exists. TouchDirAll also ensures the given directory is writable.
func TouchDirAll(dir string) error {
func TouchDirAll(lg *zap.Logger, dir string) error {
// If path is already a directory, MkdirAll does nothing and returns nil, so,
// first check if dir exist with an expected permission mode.
if Exist(dir) {
err := CheckDirPermission(dir, PrivateDirMode)
if err != nil {
lg, _ := zap.NewProduction()
if lg == nil {
lg = zap.NewExample()
}
lg.Warn("check file permission", zap.Error(err))
}
} else {
@ -70,8 +66,8 @@ func TouchDirAll(dir string) error {
// CreateDirAll is similar to TouchDirAll but returns error
// if the deepest directory was not empty.
func CreateDirAll(dir string) error {
err := TouchDirAll(dir)
func CreateDirAll(lg *zap.Logger, dir string) error {
err := TouchDirAll(lg, dir)
if err == nil {
var ns []string
ns, err = ReadDir(dir)

View File

@ -67,7 +67,7 @@ func TestCreateDirAll(t *testing.T) {
defer os.RemoveAll(tmpdir)
tmpdir2 := filepath.Join(tmpdir, "testdir")
if err = CreateDirAll(tmpdir2); err != nil {
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err != nil {
t.Fatal(err)
}
@ -75,7 +75,7 @@ func TestCreateDirAll(t *testing.T) {
t.Fatal(err)
}
if err = CreateDirAll(tmpdir2); err == nil || !strings.Contains(err.Error(), "to be empty, got") {
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err == nil || !strings.Contains(err.Error(), "to be empty, got") {
t.Fatalf("unexpected error %v", err)
}
}
@ -186,7 +186,7 @@ func TestDirPermission(t *testing.T) {
tmpdir2 := filepath.Join(tmpdir, "testpermission")
// create a new dir with 0700
if err = CreateDirAll(tmpdir2); err != nil {
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err != nil {
t.Fatal(err)
}
// check dir permission with mode different than created dir

View File

@ -41,6 +41,12 @@ func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval
lg = zap.NewNop()
}
errC := make(chan error, 1)
lg.Info("started to purge file",
zap.String("dir", dirname),
zap.String("suffix", suffix),
zap.Uint("max", max),
zap.Duration("interval", interval))
go func() {
if donec != nil {
defer close(donec)
@ -63,14 +69,16 @@ func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval
f := filepath.Join(dirname, newfnames[0])
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
lg.Warn("failed to lock file", zap.String("path", f), zap.Error(err))
break
}
if err = os.Remove(f); err != nil {
lg.Error("failed to remove file", zap.String("path", f), zap.Error(err))
errC <- err
return
}
if err = l.Close(); err != nil {
lg.Warn("failed to unlock/close", zap.String("path", l.Name()), zap.Error(err))
lg.Error("failed to unlock/close", zap.String("path", l.Name()), zap.Error(err))
errC <- err
return
}

View File

@ -14,7 +14,10 @@
package tlsutil
import "crypto/tls"
import (
"crypto/tls"
"fmt"
)
// GetCipherSuite returns the corresponding cipher suite,
// and boolean value if it is supported.
@ -37,3 +40,17 @@ func GetCipherSuite(s string) (uint16, bool) {
}
return 0, false
}
// GetCipherSuites returns list of corresponding cipher suite IDs.
func GetCipherSuites(ss []string) ([]uint16, error) {
cs := make([]uint16, len(ss))
for i, s := range ss {
var ok bool
cs[i], ok = GetCipherSuite(s)
if !ok {
return nil, fmt.Errorf("unexpected TLS cipher suite %q", s)
}
}
return cs, nil
}

View File

@ -205,7 +205,7 @@ func SelfCert(lg *zap.Logger, dirpath string, hosts []string, selfSignedCertVali
)
return
}
err = fileutil.TouchDirAll(dirpath)
err = fileutil.TouchDirAll(lg, dirpath)
if err != nil {
if info.Logger != nil {
info.Logger.Warn(

View File

@ -5,8 +5,8 @@ go 1.16
require (
github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
)
replace (

View File

@ -45,25 +45,46 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr
// Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error {
client := b.s.Client()
// Check the entered clients before creating the UniqueEphemeralKey,
// fail the request if there are already too many clients.
if resp1, err := b.enteredClients(client); err != nil {
return err
} else if len(resp1.Kvs) >= b.count {
return ErrTooManyClients
}
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil {
return err
}
b.myKey = ek
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
// Check the entered clients after creating the UniqueEphemeralKey
resp2, err := b.enteredClients(client)
if err != nil {
return err
}
if len(resp2.Kvs) >= b.count {
lastWaiter := resp2.Kvs[b.count-1]
if ek.rev > lastWaiter.CreateRevision {
// delete itself now, otherwise other processes may need to wait
// until these keys are automatically deleted when the related
// lease expires.
if err = b.myKey.Delete(); err != nil {
// Nothing to do here. We have to wait for the key to be
// deleted when the lease expires.
}
return ErrTooManyClients
}
if len(resp.Kvs) > b.count {
return ErrTooManyClients
}
if len(resp.Kvs) == b.count {
// unblock waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
if ek.rev == lastWaiter.CreateRevision {
// TODO(ahrtr): we might need to compare ek.key and
// string(lastWaiter.Key), they should be equal.
// unblock all other waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
}
}
_, err = WaitEvents(
@ -74,6 +95,18 @@ func (b *DoubleBarrier) Enter() error {
return err
}
// enteredClients gets all the entered clients, which are ordered by the
// createRevision in ascending order.
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
if err != nil {
return nil, err
}
return resp, nil
}
// Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error {
client := b.s.Client()
@ -96,7 +129,7 @@ func (b *DoubleBarrier) Leave() error {
}
isLowest := string(lowest.Key) == b.myKey.Key()
if len(resp.Kvs) == 1 {
if len(resp.Kvs) == 1 && isLowest {
// this is the only node in the barrier; finish up
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err

View File

@ -6,8 +6,8 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.11.1
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.41.0
sigs.k8s.io/yaml v1.2.0

View File

@ -92,6 +92,7 @@ func NewMaintenance(c *Client) Maintenance {
err = c.getToken(dctx)
cancel()
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
}
cancel = func() { conn.Close() }

View File

@ -74,13 +74,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
continue
}
if c.shouldRefreshToken(lastErr, callOpts) {
// clear auth token before refreshing it.
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
// and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource.
c.authTokenBundle.UpdateAuthToken("")
gterr := c.getToken(ctx)
gterr := c.refreshToken(ctx)
if gterr != nil {
c.GetLogger().Warn(
"retrying of unary invoker failed to fetch new auth token",
@ -161,6 +155,24 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
}
func (c *Client) refreshToken(ctx context.Context) error {
if c.authTokenBundle == nil {
// c.authTokenBundle will be initialized only when
// c.Username != "" && c.Password != "".
//
// When users use the TLS CommonName based authentication, the
// authTokenBundle is always nil. But it's possible for the clients
// to get `rpctypes.ErrAuthOldRevision` response when the clients
// concurrently modify auth data (e.g, addUser, deleteUser etc.).
// In this case, there is no need to refresh the token; instead the
// clients just need to retry the operations (e.g. Put, Delete etc).
return nil
}
// clear auth token before refreshing it.
c.authTokenBundle.UpdateAuthToken("")
return c.getToken(ctx)
}
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy.
@ -259,10 +271,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
return true, err
}
if s.client.shouldRefreshToken(err, s.callOpts) {
// clear auth token to avoid failure when call getToken
s.client.authTokenBundle.UpdateAuthToken("")
gterr := s.client.getToken(s.ctx)
gterr := s.client.refreshToken(s.ctx)
if gterr != nil {
s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
return false, err // return the original error for simplicity

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@ -37,6 +38,13 @@ const (
EventTypePut = mvccpb.PUT
closeSendErrTimeout = 250 * time.Millisecond
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
AutoWatchID = 0
// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
InvalidWatchID = -1
)
type Event mvccpb.Event
@ -450,7 +458,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
// check watch ID for backward compatibility (<= v3.3)
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
@ -481,7 +489,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
} else if ws.outc != nil {
close(ws.outc)
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
delete(w.substreams, ws.id)
return
}
@ -533,6 +541,7 @@ func (w *watchGrpcStream) run() {
cancelSet := make(map[int64]struct{})
var cur *pb.WatchResponse
backoff := time.Millisecond
for {
select {
// Watch() requested
@ -543,7 +552,7 @@ func (w *watchGrpcStream) run() {
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
id: InvalidWatchID,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
@ -580,6 +589,26 @@ func (w *watchGrpcStream) run() {
switch {
case pbresp.Created:
cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason))
if shouldRetryWatch(cancelReasonError) {
var newErr error
if wc, newErr = w.newWatchClient(); newErr != nil {
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
return
}
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
w.lg.Debug("error when sending request", zap.Error(err))
}
}
}
cur = nil
continue
}
// response to head of queue creation
if len(w.resuming) != 0 {
if ws := w.resuming[0]; ws != nil {
@ -649,6 +678,7 @@ func (w *watchGrpcStream) run() {
closeErr = err
return
}
backoff = w.backoffIfUnavailable(backoff, err)
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
@ -669,7 +699,7 @@ func (w *watchGrpcStream) run() {
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
if ws.id != InvalidWatchID {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
cancelSet[ws.id] = struct{}{}
@ -688,6 +718,11 @@ func (w *watchGrpcStream) run() {
}
}
func shouldRetryWatch(cancelReasonError error) bool {
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) ||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0)
}
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
@ -716,9 +751,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
cancelReason: pbresp.CancelReason,
}
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
// indicate they should be broadcast.
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
return w.broadcastResponse(wr)
}
@ -873,7 +908,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
ws.id = InvalidWatchID
w.resuming = append(w.resuming, ws)
}
// strip out nils, if any
@ -963,6 +998,21 @@ func (w *watchGrpcStream) joinSubstreams() {
var maxBackoff = 100 * time.Millisecond
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
if isUnavailableErr(w.ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
return backoff
}
// openWatchClient retries opening a watch client until success or halt.
// manually retry in case "ws==nil && err==nil"
// TODO: remove FailFast=false
@ -983,17 +1033,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err)
}
if isUnavailableErr(w.ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
backoff = w.backoffIfUnavailable(backoff, err)
}
return ws, nil
}

View File

@ -16,9 +16,10 @@ package command
import (
"fmt"
"os"
"github.com/spf13/cobra"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/cobrautl"
)
@ -26,6 +27,7 @@ var (
delPrefix bool
delPrevKV bool
delFromKey bool
delRange bool
)
// NewDelCommand returns the cobra command for "del".
@ -39,6 +41,7 @@ func NewDelCommand() *cobra.Command {
cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix")
cmd.Flags().BoolVar(&delPrevKV, "prev-kv", false, "return deleted key-value pairs")
cmd.Flags().BoolVar(&delFromKey, "from-key", false, "delete keys that are greater than or equal to the given key using byte compare")
cmd.Flags().BoolVar(&delRange, "range", false, "delete range of keys")
return cmd
}
@ -70,6 +73,9 @@ func getDelOp(args []string) (string, []clientv3.OpOption) {
cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("too many arguments, only accept one argument when `--prefix` or `--from-key` is set"))
}
opts = append(opts, clientv3.WithRange(args[1]))
if !delRange {
fmt.Fprintln(os.Stderr, "In etcd v3.6, the operation will be suspended for a few seconds to provide the user time to verify range.")
}
}
if delPrefix {

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v2 v2.305.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/etcdutl/v3 v3.5.5
go.etcd.io/etcd/pkg/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.etcd.io/etcd/client/v2 v2.305.6
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/etcdutl/v3 v3.5.6
go.etcd.io/etcd/pkg/v3 v3.5.6
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.41.0

View File

@ -82,8 +82,6 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -102,6 +100,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=

View File

@ -114,7 +114,7 @@ func HandleBackup(withV3 bool, srcDir string, destDir string, srcWAL string, des
destWAL = datadir.ToWalDir(destDir)
}
if err := fileutil.CreateDirAll(destSnap); err != nil {
if err := fileutil.CreateDirAll(lg, destSnap); err != nil {
lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err))
}

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/pkg/v3 v3.5.5
go.etcd.io/etcd/raft/v3 v3.5.5
go.etcd.io/etcd/server/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/pkg/v3 v3.5.6
go.etcd.io/etcd/raft/v3 v3.5.6
go.etcd.io/etcd/server/v3 v3.5.6
go.uber.org/zap v1.17.0
)

View File

@ -78,8 +78,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -98,6 +96,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=

View File

@ -322,7 +322,7 @@ func (s *v3Manager) copyAndVerifyDB() error {
return err
}
if err := fileutil.CreateDirAll(s.snapDir); err != nil {
if err := fileutil.CreateDirAll(s.lg, s.snapDir); err != nil {
return err
}
@ -383,7 +383,7 @@ func (s *v3Manager) copyAndVerifyDB() error {
//
// TODO: This code ignores learners !!!
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
if err := fileutil.CreateDirAll(s.walDir); err != nil {
if err := fileutil.CreateDirAll(s.lg, s.walDir); err != nil {
return nil, err
}

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v2 v2.305.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/etcdctl/v3 v3.5.5
go.etcd.io/etcd/etcdutl/v3 v3.5.5
go.etcd.io/etcd/pkg/v3 v3.5.5
go.etcd.io/etcd/raft/v3 v3.5.5
go.etcd.io/etcd/server/v3 v3.5.5
go.etcd.io/etcd/tests/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.etcd.io/etcd/client/v2 v2.305.6
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/etcdctl/v3 v3.5.6
go.etcd.io/etcd/etcdutl/v3 v3.5.6
go.etcd.io/etcd/pkg/v3 v3.5.6
go.etcd.io/etcd/raft/v3 v3.5.6
go.etcd.io/etcd/server/v3 v3.5.6
go.etcd.io/etcd/tests/v3 v3.5.6
go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.41.0

4
go.sum
View File

@ -84,8 +84,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -104,6 +102,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.uber.org/zap v1.17.0
google.golang.org/grpc v1.41.0
)

View File

@ -148,20 +148,31 @@ func urlsEqual(ctx context.Context, lg *zap.Logger, a []url.URL, b []url.URL) (b
if len(a) != len(b) {
return false, fmt.Errorf("len(%q) != len(%q)", urlsToStrings(a), urlsToStrings(b))
}
sort.Sort(types.URLs(a))
sort.Sort(types.URLs(b))
var needResolve bool
for i := range a {
if !reflect.DeepEqual(a[i], b[i]) {
needResolve = true
break
}
}
if !needResolve {
return true, nil
}
// If URLs are not equal, try to resolve it and compare again.
urls, err := resolveTCPAddrs(ctx, lg, [][]url.URL{a, b})
if err != nil {
return false, err
}
preva, prevb := a, b
a, b = urls[0], urls[1]
sort.Sort(types.URLs(a))
sort.Sort(types.URLs(b))
for i := range a {
if !reflect.DeepEqual(a[i], b[i]) {
return false, fmt.Errorf("%q(resolved from %q) != %q(resolved from %q)",
a[i].String(), preva[i].String(),
b[i].String(), prevb[i].String(),
)
return false, fmt.Errorf("resolved urls: %q != %q", a[i].String(), b[i].String())
}
}
return true, nil
@ -174,21 +185,13 @@ func URLStringsEqual(ctx context.Context, lg *zap.Logger, a []string, b []string
if len(a) != len(b) {
return false, fmt.Errorf("len(%q) != len(%q)", a, b)
}
urlsA := make([]url.URL, 0)
for _, str := range a {
u, err := url.Parse(str)
if err != nil {
return false, fmt.Errorf("failed to parse %q", str)
}
urlsA = append(urlsA, *u)
urlsA, err := stringsToURLs(a)
if err != nil {
return false, err
}
urlsB := make([]url.URL, 0)
for _, str := range b {
u, err := url.Parse(str)
if err != nil {
return false, fmt.Errorf("failed to parse %q", str)
}
urlsB = append(urlsB, *u)
urlsB, err := stringsToURLs(b)
if err != nil {
return false, err
}
return urlsEqual(ctx, lg, urlsA, urlsB)
}
@ -201,6 +204,18 @@ func urlsToStrings(us []url.URL) []string {
return rs
}
func stringsToURLs(us []string) ([]url.URL, error) {
urls := make([]url.URL, 0, len(us))
for _, str := range us {
u, err := url.Parse(str)
if err != nil {
return nil, fmt.Errorf("failed to parse string to URL: %q", str)
}
urls = append(urls, *u)
}
return urls, nil
}
func IsNetworkTimeoutError(err error) bool {
nerr, ok := err.(net.Error)
return ok && nerr.Timeout()

View File

@ -17,6 +17,7 @@ package netutil
import (
"context"
"errors"
"fmt"
"net"
"net/url"
"reflect"
@ -166,113 +167,133 @@ func TestURLsEqual(t *testing.T) {
}
tests := []struct {
n int
a []url.URL
b []url.URL
expect bool
err error
}{
{
n: 0,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
expect: true,
},
{
n: 1,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
expect: true,
},
{
n: 2,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "https", Host: "10.0.10.1:2379"}},
expect: false,
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "https://10.0.10.1:2379"(resolved from "https://10.0.10.1:2379")`),
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "https://10.0.10.1:2379"`),
},
{
n: 3,
a: []url.URL{{Scheme: "https", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
expect: false,
err: errors.New(`"https://10.0.10.1:2379"(resolved from "https://example.com:2379") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
err: errors.New(`resolved urls: "https://10.0.10.1:2379" != "http://10.0.10.1:2379"`),
},
{
n: 4,
a: []url.URL{{Scheme: "unix", Host: "abc:2379"}},
b: []url.URL{{Scheme: "unix", Host: "abc:2379"}},
expect: true,
},
{
n: 5,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
n: 6,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
n: 7,
a: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
n: 8,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://127.0.0.1:2380"`),
},
{
n: 9,
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
expect: false,
err: errors.New(`"http://10.0.10.1:2380"(resolved from "http://example.com:2380") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
err: errors.New(`resolved urls: "http://10.0.10.1:2380" != "http://10.0.10.1:2379"`),
},
{
n: 10,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
expect: false,
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://10.0.0.1:2379"`),
},
{
n: 11,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
expect: false,
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://10.0.0.1:2379"`),
},
{
n: 12,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://127.0.0.1:2380"`),
},
{
n: 13,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://127.0.0.1:2380"`),
},
{
n: 14,
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://10.0.0.1:2379"`),
},
{
n: 15,
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://10.0.0.1:2379"`),
},
{
n: 16,
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
err: errors.New(`len(["http://10.0.0.1:2379"]) != len(["http://10.0.0.1:2379" "http://127.0.0.1:2380"])`),
},
{
n: 17,
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
expect: true,
},
{
n: 18,
a: []url.URL{{Scheme: "http", Host: "second.com:2380"}, {Scheme: "http", Host: "first.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
expect: true,
@ -282,21 +303,43 @@ func TestURLsEqual(t *testing.T) {
for i, test := range tests {
result, err := urlsEqual(context.TODO(), zap.NewExample(), test.a, test.b)
if result != test.expect {
t.Errorf("#%d: a:%v b:%v, expected %v but %v", i, test.a, test.b, test.expect, result)
t.Errorf("idx=%d #%d: a:%v b:%v, expected %v but %v", i, test.n, test.a, test.b, test.expect, result)
}
if test.err != nil {
if err.Error() != test.err.Error() {
t.Errorf("#%d: err expected %v but %v", i, test.err, err)
t.Errorf("idx=%d #%d: err expected %v but %v", i, test.n, test.err, err)
}
}
}
}
func TestURLStringsEqual(t *testing.T) {
result, err := URLStringsEqual(context.TODO(), zap.NewExample(), []string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
if !result {
t.Errorf("unexpected result %v", result)
defer func() { resolveTCPAddr = resolveTCPAddrDefault }()
errOnResolve := func(ctx context.Context, addr string) (*net.TCPAddr, error) {
return nil, fmt.Errorf("unexpected attempt to resolve: %q", addr)
}
if err != nil {
t.Errorf("unexpected error %v", err)
cases := []struct {
urlsA []string
urlsB []string
resolver func(ctx context.Context, addr string) (*net.TCPAddr, error)
}{
{[]string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"}, resolveTCPAddrDefault},
{[]string{
"http://host1:8080",
"http://host2:8080",
}, []string{
"http://host1:8080",
"http://host2:8080",
}, errOnResolve},
}
for idx, c := range cases {
t.Logf("TestURLStringsEqual, case #%d", idx)
resolveTCPAddr = c.resolver
result, err := URLStringsEqual(context.TODO(), zap.NewExample(), c.urlsA, c.urlsB)
if !result {
t.Errorf("unexpected result %v", result)
}
if err != nil {
t.Errorf("unexpected error %v", err)
}
}
}

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.6
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -87,7 +87,7 @@ function main {
export GOARCH=${TARGET_ARCH}
pushd etcd >/dev/null
GO_LDFLAGS="-s" ./build.sh
GO_LDFLAGS="-s -w" ./build.sh
popd >/dev/null
TARGET="etcd-${VER}-${GOOS}-${GOARCH}"

View File

@ -21,7 +21,7 @@ import (
"errors"
"time"
jwt "github.com/form3tech-oss/jwt-go"
"github.com/golang-jwt/jwt/v4"
"go.uber.org/zap"
)

View File

@ -21,7 +21,7 @@ import (
"io/ioutil"
"time"
jwt "github.com/form3tech-oss/jwt-go"
"github.com/golang-jwt/jwt/v4"
)
const (

View File

@ -370,6 +370,7 @@ func (as *authStore) Recover(be backend.Backend) {
}
as.setRevision(getRevision(tx))
as.refreshRangePermCache(tx)
tx.Unlock()

View File

@ -189,6 +189,30 @@ func TestRecover(t *testing.T) {
}
}
func TestRecoverWithEmptyRangePermCache(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer as.Close()
defer tearDown(t)
as.enabled = false
as.rangePermCache = map[string]*unifiedRangePermissions{}
as.Recover(as.be)
if !as.IsAuthEnabled() {
t.Fatalf("expected auth enabled got disabled")
}
if len(as.rangePermCache) != 2 {
t.Fatalf("rangePermCache should have permission information for 2 users (\"root\" and \"foo\"), but has %d information", len(as.rangePermCache))
}
if _, ok := as.rangePermCache["root"]; !ok {
t.Fatal("user \"root\" should be created by setupAuthStore() but doesn't exist in rangePermCache")
}
if _, ok := as.rangePermCache["foo"]; !ok {
t.Fatal("user \"foo\" should be created by setupAuthStore() but doesn't exist in rangePermCache")
}
}
func TestCheckPassword(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)

View File

@ -619,13 +619,9 @@ func updateCipherSuites(tls *transport.TLSInfo, ss []string) error {
return fmt.Errorf("TLSInfo.CipherSuites is already specified (given %v)", ss)
}
if len(ss) > 0 {
cs := make([]uint16, len(ss))
for i, s := range ss {
var ok bool
cs[i], ok = tlsutil.GetCipherSuite(s)
if !ok {
return fmt.Errorf("unexpected TLS cipher suite %q", s)
}
cs, err := tlsutil.GetCipherSuites(ss)
if err != nil {
return err
}
tls.CipherSuites = cs
}

View File

@ -323,6 +323,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
zap.Uint64("snapshot-count", sc.SnapshotCount),
zap.Uint("max-wals", sc.MaxWALFiles),
zap.Uint("max-snapshots", sc.MaxSnapFiles),
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()),
zap.Strings("listen-peer-urls", ec.getLPURLs()),

View File

@ -276,7 +276,7 @@ func startProxy(cfg *config) error {
}
cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
err = fileutil.TouchDirAll(cfg.ec.Dir)
err = fileutil.TouchDirAll(lg, cfg.ec.Dir)
if err != nil {
return err
}

View File

@ -31,6 +31,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/leasing"
@ -41,12 +42,12 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
"go.uber.org/zap/zapgrpc"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapgrpc"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
@ -74,12 +75,13 @@ var (
// tls for clients connecting to proxy
grpcProxyListenCA string
grpcProxyListenCert string
grpcProxyListenKey string
grpcProxyListenAutoTLS bool
grpcProxyListenCRL string
selfSignedCertValidity uint
grpcProxyListenCA string
grpcProxyListenCert string
grpcProxyListenKey string
grpcProxyListenCipherSuites []string
grpcProxyListenAutoTLS bool
grpcProxyListenCRL string
selfSignedCertValidity uint
grpcProxyAdvertiseClientURL string
grpcProxyResolverPrefix string
@ -154,6 +156,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
cmd.Flags().StringSliceVar(&grpcProxyListenCipherSuites, "listen-cipher-suites", grpcProxyListenCipherSuites, "Comma-separated list of supported TLS cipher suites between client/proxy (empty will be auto-populated by Go).")
cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
cmd.Flags().UintVar(&selfSignedCertValidity, "self-signed-cert-validity", 1, "The validity period of the proxy certificates, unit is year")
@ -187,21 +190,28 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
// The proxy itself (ListenCert) can have not-empty CN.
// The empty CN is required for grpcProxyCert.
// Please see https://github.com/etcd-io/etcd/issues/11970#issuecomment-687875315 for more context.
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
if tlsinfo == nil && grpcProxyListenAutoTLS {
tlsInfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
if len(grpcProxyListenCipherSuites) > 0 {
cs, err := tlsutil.GetCipherSuites(grpcProxyListenCipherSuites)
if err != nil {
log.Fatal(err)
}
tlsInfo.CipherSuites = cs
}
if tlsInfo == nil && grpcProxyListenAutoTLS {
host := []string{"https://" + grpcProxyListenAddr}
dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
autoTLS, err := transport.SelfCert(lg, dir, host, selfSignedCertValidity)
if err != nil {
log.Fatal(err)
}
tlsinfo = &autoTLS
tlsInfo = &autoTLS
}
if tlsinfo != nil {
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
if tlsInfo != nil {
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo)))
}
m := mustListenCMux(lg, tlsinfo)
m := mustListenCMux(lg, tlsInfo)
grpcl := m.Match(cmux.HTTP2())
defer func() {
grpcl.Close()
@ -214,11 +224,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
// TODO: The mechanism should be refactored to use internal connection.
var proxyClient *clientv3.Client
if grpcProxyAdvertiseClientURL != "" {
proxyClient = mustNewProxyClient(lg, tlsinfo)
proxyClient = mustNewProxyClient(lg, tlsInfo)
}
httpClient := mustNewHTTPClient(lg)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient)
if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
@ -231,7 +241,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
go func() { errc <- srvhttp.Serve(httpl) }()
go func() { errc <- m.Serve() }()
if len(grpcProxyMetricsListenAddr) > 0 {
mhttpl := mustMetricsListener(lg, tlsinfo)
mhttpl := mustMetricsListener(lg, tlsInfo)
go func() {
mux := http.NewServeMux()
grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())

View File

@ -16,6 +16,7 @@ package v3rpc
import (
"context"
"fmt"
"io"
"math/rand"
"sync"
@ -24,6 +25,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/mvcc"
@ -223,16 +225,16 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
return err
}
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool {
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) error {
authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
if err != nil {
return false
return err
}
if authInfo == nil {
// if auth is enabled, IsRangePermitted() can cause an error
authInfo = &auth.AuthInfo{}
}
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd)
}
func (sws *serverWatchStream) recvLoop() error {
@ -266,13 +268,29 @@ func (sws *serverWatchStream) recvLoop() error {
creq.RangeEnd = []byte{}
}
if !sws.isWatchPermitted(creq) {
err := sws.isWatchPermitted(creq)
if err != nil {
var cancelReason string
switch err {
case auth.ErrInvalidAuthToken:
cancelReason = rpctypes.ErrGRPCInvalidAuthToken.Error()
case auth.ErrAuthOldRevision:
cancelReason = rpctypes.ErrGRPCAuthOldRevision.Error()
case auth.ErrUserEmpty:
cancelReason = rpctypes.ErrGRPCUserEmpty.Error()
default:
if err != auth.ErrPermissionDenied {
sws.lg.Error("unexpected error code", zap.Error(err))
}
cancelReason = rpctypes.ErrGRPCPermissionDenied.Error()
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId,
WatchId: clientv3.InvalidWatchID,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
CancelReason: cancelReason,
}
select {
@ -303,7 +321,10 @@ func (sws *serverWatchStream) recvLoop() error {
sws.fragment[id] = true
}
sws.mu.Unlock()
} else {
id = clientv3.InvalidWatchID
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
@ -340,7 +361,7 @@ func (sws *serverWatchStream) recvLoop() error {
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
}
}
default:
@ -463,7 +484,12 @@ func (sws *serverWatchStream) sendLoop() {
// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
if !(!(c.Canceled && c.Created) || wid == clientv3.InvalidWatchID) {
panic(fmt.Sprintf("unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchID))
}
if c.Canceled && wid != clientv3.InvalidWatchID {
delete(ids, wid)
continue
}

View File

@ -349,13 +349,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
)
}
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
haveWAL := wal.Exist(cfg.WALDir())
if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
if err = fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil {
cfg.Logger.Fatal(
"failed to create snapshot directory",
zap.String("path", cfg.SnapDir()),
@ -548,7 +548,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, fmt.Errorf("unsupported bootstrap config")
}
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}

View File

@ -6,8 +6,8 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/coreos/go-systemd/v22 v22.3.2
github.com/dustin/go-humanize v1.0.0
github.com/form3tech-oss/jwt-go v3.2.3+incompatible
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v4 v4.4.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.1
@ -25,12 +25,12 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v2 v2.305.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/pkg/v3 v3.5.5
go.etcd.io/etcd/raft/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.etcd.io/etcd/client/v2 v2.305.6
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/pkg/v3 v3.5.6
go.etcd.io/etcd/raft/v3 v3.5.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0
go.opentelemetry.io/otel v1.0.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1

View File

@ -80,8 +80,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -100,6 +98,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=

View File

@ -329,10 +329,6 @@ func (t *batchTxBuffered) CommitAndStop() {
}
func (t *batchTxBuffered) commit(stop bool) {
if t.backend.hooks != nil {
t.backend.hooks.OnPreCommitUnsafe(t)
}
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.Lock()
t.unsafeCommit(stop)
@ -340,6 +336,9 @@ func (t *batchTxBuffered) commit(stop bool) {
}
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.hooks != nil {
t.backend.hooks.OnPreCommitUnsafe(t)
}
if t.backend.readTx.tx != nil {
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx

View File

@ -167,6 +167,13 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
zap.Int64("revision-current", curRev),
zap.Int64("range-option-rev", ro.Rev),
zap.Int64("range-option-limit", ro.Limit),
zap.Binary("key", key),
zap.Binary("end", end),
zap.Int("len-revpairs", len(revpairs)),
zap.Int("len-values", len(vs)),
)
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {

View File

@ -20,12 +20,9 @@ import (
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0
var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
@ -118,7 +115,7 @@ func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ..
return -1, ErrEmptyWatcherRange
}
if id == AutoWatchID {
if id == clientv3.AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}

View File

@ -238,7 +238,7 @@ func (wps *watchProxyStream) recvLoop() error {
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
wps.watchCh <- &pb.WatchResponse{
Header: &pb.ResponseHeader{},
WatchId: -1,
WatchId: clientv3.InvalidWatchID,
Created: true,
Canceled: true,
CancelReason: err.Error(),
@ -258,7 +258,7 @@ func (wps *watchProxyStream) recvLoop() error {
filters: v3rpc.FiltersFromRequest(cr),
}
if !w.wr.valid() {
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
wps.mu.Unlock()
continue
}

View File

@ -115,7 +115,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
}
defer os.RemoveAll(tmpdirpath)
if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
if err := fileutil.CreateDirAll(lg, tmpdirpath); err != nil {
lg.Warn(
"failed to create a temporary WAL directory",
zap.String("tmp-dir-path", tmpdirpath),

View File

@ -1,4 +1,4 @@
FROM ubuntu:20.10
FROM ubuntu:22.04
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// These tests depends on certificate-based authentication that is NOT supported
// These tests depend on certificate-based authentication that is NOT supported
// by gRPC proxy.
//go:build !cluster_proxy
// +build !cluster_proxy
@ -20,7 +20,10 @@
package e2e
import (
"fmt"
"sync"
"testing"
"time"
)
func TestCtlV3AuthCertCN(t *testing.T) {
@ -32,3 +35,106 @@ func TestCtlV3AuthCertCNAndUsername(t *testing.T) {
func TestCtlV3AuthCertCNAndUsernameNoPassword(t *testing.T) {
testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(*newConfigClientTLSCertAuth()))
}
func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
BeforeTest(t)
// apply the certificate which has `root` CommonName,
// and reset the setting when the test case finishes.
// TODO(ahrtr): enhance the e2e test framework to support
// certificates with CommonName.
t.Log("Apply certificate with root CommonName")
resetCert := applyTLSWithRootCommonName()
defer resetCert()
t.Log("Create an etcd cluster")
cx := getDefaultCtlCtx(t)
cx.cfg = etcdProcessClusterConfig{
clusterSize: 1,
clientTLS: clientTLS,
clientCertAuthEnabled: true,
initialToken: "new",
}
epc, err := newEtcdProcessCluster(t, &cx.cfg)
if err != nil {
t.Fatalf("Failed to start etcd cluster: %v", err)
}
cx.epc = epc
cx.dataDir = epc.procs[0].Config().dataDirPath
defer func() {
if err := epc.Close(); err != nil {
t.Fatalf("could not close test cluster (%v)", err)
}
}()
t.Log("Enable auth")
authEnableTest(cx)
// Create two goroutines, one goroutine keeps creating & deleting users,
// and the other goroutine keeps writing & deleting K/V entries.
var wg sync.WaitGroup
wg.Add(2)
errs := make(chan error, 2)
donec := make(chan struct{})
// Create the first goroutine to create & delete users
t.Log("Create the first goroutine to create & delete users")
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
user := fmt.Sprintf("testuser-%d", i)
pass := fmt.Sprintf("testpass-%d", i)
if err := ctlV3User(cx, []string{"add", user, "--interactive=false"}, fmt.Sprintf("User %s created", user), []string{pass}); err != nil {
errs <- fmt.Errorf("failed to create user %q: %w", user, err)
break
}
err := ctlV3User(cx, []string{"delete", user}, fmt.Sprintf("User %s deleted", user), []string{})
if err != nil {
errs <- fmt.Errorf("failed to delete user %q: %w", user, err)
break
}
}
t.Log("The first goroutine finished")
}()
// Create the second goroutine to write & delete K/V entries
t.Log("Create the second goroutine to write & delete K/V entries")
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
if err := ctlV3Put(cx, key, value, ""); err != nil {
errs <- fmt.Errorf("failed to put key %q: %w", key, err)
break
}
if err := ctlV3Del(cx, []string{key}, 1); err != nil {
errs <- fmt.Errorf("failed to delete key %q: %w", key, err)
break
}
}
t.Log("The second goroutine finished")
}()
t.Log("Waiting for the two goroutines to complete")
go func() {
wg.Wait()
close(donec)
}()
t.Log("Waiting for test result")
select {
case err := <-errs:
t.Fatalf("Unexpected error: %v", err)
case <-donec:
t.Log("All done!")
case <-time.After(60 * time.Second):
t.Fatal("Test case timeout after 60 seconds")
}
}

View File

@ -17,11 +17,16 @@ package e2e
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3"
)
@ -72,6 +77,10 @@ func TestCtlV3AuthJWTExpire(t *testing.T) { testCtl(t, authTestJWTExpi
func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) }
func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) }
func TestCtlV3AuthRecoverFromSnapshot(t *testing.T) {
testCtl(t, authTestRecoverSnapshot, withCfg(*newConfigNoTLS()), withQuorum(), withSnapshotCount(5))
}
func authEnableTest(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
@ -92,6 +101,28 @@ func authEnable(cx ctlCtx) error {
return nil
}
func applyTLSWithRootCommonName() func() {
var (
oldCertPath = certPath
oldPrivateKeyPath = privateKeyPath
oldCaPath = caPath
newCertPath = filepath.Join(fixturesDir, "CommonName-root.crt")
newPrivateKeyPath = filepath.Join(fixturesDir, "CommonName-root.key")
newCaPath = filepath.Join(fixturesDir, "CommonName-root.crt")
)
certPath = newCertPath
privateKeyPath = newPrivateKeyPath
caPath = newCaPath
return func() {
certPath = oldCertPath
privateKeyPath = oldPrivateKeyPath
caPath = oldCaPath
}
}
func ctlV3AuthEnable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
@ -1289,3 +1320,192 @@ func authTestCacheReload(cx ctlCtx) {
cx.t.Fatal(err)
}
}
// Verify that etcd works after recovering from a snapshot.
// Refer to https://github.com/etcd-io/etcd/issues/14571.
func authTestRecoverSnapshot(cx ctlCtx) {
roles := []authRole{
{
role: "role0",
permission: clientv3.PermissionType(clientv3.PermReadWrite),
key: "foo",
},
}
users := []authUser{
{
user: "root",
pass: "rootPass",
role: "root",
},
{
user: "user0",
pass: "user0Pass",
role: "role0",
},
}
cx.t.Log("setup and enable auth")
setupAuth(cx, roles, users)
// create a client with root user
cx.t.Log("create a client with root user")
cliRoot, err := clientv3.New(clientv3.Config{Endpoints: cx.epc.EndpointsV3(), Username: "root", Password: "rootPass", DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cliRoot.Close()
// write more than SnapshotCount keys, so that at least one snapshot is created
cx.t.Log("Write enough key/value to trigger a snapshot")
for i := 0; i <= 6; i++ {
if _, err := cliRoot.Put(context.TODO(), fmt.Sprintf("key_%d", i), fmt.Sprintf("value_%d", i)); err != nil {
cx.t.Fatalf("failed to Put (%v)", err)
}
}
// add a new member into the cluster
// Refer to https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/tests/e2e/cluster_test.go#L238
var (
idx = 3
name = fmt.Sprintf("test-%d", idx)
port = cx.cfg.basePort + 5*idx
curlHost = fmt.Sprintf("localhost:%d", port)
nodeClientURL = url.URL{Scheme: cx.cfg.clientScheme(), Host: curlHost}
nodePeerURL = url.URL{Scheme: cx.cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
initialCluster = cx.epc.procs[0].Config().initialCluster + "," + fmt.Sprintf("%s=%s", name, nodePeerURL.String())
)
cx.t.Logf("Adding a new member: %s", nodePeerURL.String())
// Must wait at least 5 seconds, otherwise it will always get an
// "etcdserver: unhealthy cluster" response, please refer to link below,
// https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/server/etcdserver/server.go#L1611
assert.Eventually(cx.t, func() bool {
if _, err := cliRoot.MemberAdd(context.TODO(), []string{nodePeerURL.String()}); err != nil {
cx.t.Logf("Failed to add member, peelURL: %s, error: %v", nodePeerURL.String(), err)
return false
}
return true
}, 8*time.Second, 2*time.Second)
cx.t.Logf("Starting the new member: %s", nodePeerURL.String())
newProc, err := runEtcdNode(name, cx.t.TempDir(), nodeClientURL.String(), nodePeerURL.String(), "existing", initialCluster)
require.NoError(cx.t, err)
defer newProc.Stop()
// create a client with user "user0", and connects to the new member
cx.t.Log("create a client with user 'user0'")
cliUser, err := clientv3.New(clientv3.Config{Endpoints: []string{nodeClientURL.String()}, Username: "user0", Password: "user0Pass", DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cliUser.Close()
// write data using the cliUser, expect no error
cx.t.Log("Write a key/value using user 'user0'")
_, err = cliUser.Put(context.TODO(), "foo", "bar")
require.NoError(cx.t, err)
//verify all nodes have the same revision and hash
var endpoints []string
for _, proc := range cx.epc.procs {
endpoints = append(endpoints, proc.Config().acurl)
}
endpoints = append(endpoints, nodeClientURL.String())
cx.t.Log("Verify all members have the same revision and hash")
assert.Eventually(cx.t, func() bool {
hashKvs, err := hashKVs(endpoints, cliRoot)
if err != nil {
cx.t.Logf("failed to get HashKV: %v", err)
return false
}
if len(hashKvs) != 4 {
cx.t.Logf("expected 4 hashkv responses, but got: %d", len(hashKvs))
return false
}
if !(hashKvs[0].Header.Revision == hashKvs[1].Header.Revision &&
hashKvs[0].Header.Revision == hashKvs[2].Header.Revision &&
hashKvs[0].Header.Revision == hashKvs[3].Header.Revision) {
cx.t.Logf("Got different revisions, [%d, %d, %d, %d]",
hashKvs[0].Header.Revision,
hashKvs[1].Header.Revision,
hashKvs[2].Header.Revision,
hashKvs[3].Header.Revision)
return false
}
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[1].Hash)
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[2].Hash)
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[3].Hash)
return true
}, 5*time.Second, 100*time.Millisecond)
}
type authRole struct {
role string
permission clientv3.PermissionType
key string
keyEnd string
}
type authUser struct {
user string
pass string
role string
}
func setupAuth(cx ctlCtx, roles []authRole, users []authUser) {
endpoint := cx.epc.procs[0].EndpointsV3()[0]
// create a client
c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer c.Close()
// create roles
for _, r := range roles {
// add role
if _, err = c.RoleAdd(context.TODO(), r.role); err != nil {
cx.t.Fatal(err)
}
// grant permission to role
if _, err = c.RoleGrantPermission(context.TODO(), r.role, r.key, r.keyEnd, r.permission); err != nil {
cx.t.Fatal(err)
}
}
// create users
for _, u := range users {
// add user
if _, err = c.UserAdd(context.TODO(), u.user, u.pass); err != nil {
cx.t.Fatal(err)
}
// grant role to user
if _, err = c.UserGrantRole(context.TODO(), u.user, u.role); err != nil {
cx.t.Fatal(err)
}
}
// enable auth
if _, err = c.AuthEnable(context.TODO()); err != nil {
cx.t.Fatal(err)
}
}
func hashKVs(endpoints []string, cli *clientv3.Client) ([]*clientv3.HashKVResponse, error) {
var retHashKVs []*clientv3.HashKVResponse
for _, ep := range endpoints {
resp, err := cli.HashKV(context.TODO(), ep, 0)
if err != nil {
return nil, err
}
retHashKVs = append(retHashKVs, resp)
}
return retHashKVs, nil
}

View File

@ -221,6 +221,14 @@ func withMaxConcurrentStreams(streams uint32) ctlOption {
}
}
// This function must be called after the `withCfg`, otherwise its value
// may be overwritten by `withCfg`.
func withSnapshotCount(snapshotCount int) ctlOption {
return func(cx *ctlCtx) {
cx.cfg.snapshotCount = snapshotCount
}
}
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
testCtlWithOffline(t, testFunc, nil, opts...)
}

View File

@ -320,6 +320,46 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
}
}
func TestGrpcproxyAndListenCipherSuite(t *testing.T) {
skipInShortMode(t)
cases := []struct {
name string
args []string
}{
{
name: "ArgsWithCipherSuites",
args: []string{
binDir + "/etcd",
"grpc-proxy",
"start",
"--listen-cipher-suites", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
},
},
{
name: "ArgsWithoutCipherSuites",
args: []string{
binDir + "/etcd",
"grpc-proxy",
"start",
"--listen-cipher-suites", "",
},
},
}
for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
pw, err := spawnCmd(test.args, nil)
if err != nil {
t.Fatal(err)
}
if err = pw.Stop(); err != nil {
t.Fatal(err)
}
})
}
}
func TestBootstrapDefragFlag(t *testing.T) {
skipInShortMode(t)

29
tests/fixtures/CommonName-root.crt vendored Normal file
View File

@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIE5zCCA8+gAwIBAgIJAKooGDZuR2mMMA0GCSqGSIb3DQEBCwUAMH8xCzAJBgNV
BAYTAkNOMRAwDgYDVQQIDAdCZWlqaW5nMRAwDgYDVQQHDAdCZWlqaW5nMQ0wCwYD
VQQKDAREZW1vMQ0wCwYDVQQLDAREZW1vMQ0wCwYDVQQDDARyb290MR8wHQYJKoZI
hvcNAQkBFhB0ZXN0QGV4YW1wbGUuY29tMB4XDTIyMTExNjA2NTI1M1oXDTMyMTEx
MzA2NTI1M1owfzELMAkGA1UEBhMCQ04xEDAOBgNVBAgMB0JlaWppbmcxEDAOBgNV
BAcMB0JlaWppbmcxDTALBgNVBAoMBERlbW8xDTALBgNVBAsMBERlbW8xDTALBgNV
BAMMBHJvb3QxHzAdBgkqhkiG9w0BCQEWEHRlc3RAZXhhbXBsZS5jb20wggEiMA0G
CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEAKcjzhtOG3hWbAUCbudE1gPOeteT
0INk2ngN2uCMYjYSZmaGhW/GZk3EvV7wKVuhdTyrh36E5Iajng9d2t1iOU/8jROU
+uAyrS3C/S5P/urq8VBUrt3VG/44bhwTEdafNnAWQ6ojYfmK0tRqoQn1Ftm30l8I
nWof5Jm3loNA2WdNdvAp/D+6OpjUdqGdMkFd0NhkuQODMnycBMw6btUTj5SnmrMk
q7V1aasx4BqN5C4DciZF0pyyR/TT8MoQ5Vcit8rHvQUyz42Lj8+28RkDoi4prJ1i
tLaCt2egDp58vXlYQZTd50inMhnBIapKNdGpg3flW/8AFul1tCTqd8NfAgMBAAGj
ggFkMIIBYDAdBgNVHQ4EFgQUpwwvEqXjA/ArJu1Jnpw7+/sttOAwgbMGA1UdIwSB
qzCBqIAUpwwvEqXjA/ArJu1Jnpw7+/sttOChgYSkgYEwfzELMAkGA1UEBhMCQ04x
EDAOBgNVBAgMB0JlaWppbmcxEDAOBgNVBAcMB0JlaWppbmcxDTALBgNVBAoMBERl
bW8xDTALBgNVBAsMBERlbW8xDTALBgNVBAMMBHJvb3QxHzAdBgkqhkiG9w0BCQEW
EHRlc3RAZXhhbXBsZS5jb22CCQCqKBg2bkdpjDAMBgNVHRMEBTADAQH/MAsGA1Ud
DwQEAwIC/DA2BgNVHREELzAtggtleGFtcGxlLmNvbYINKi5leGFtcGxlLmNvbYIJ
bG9jYWxob3N0hwR/AAABMDYGA1UdEgQvMC2CC2V4YW1wbGUuY29tgg0qLmV4YW1w
bGUuY29tgglsb2NhbGhvc3SHBH8AAAEwDQYJKoZIhvcNAQELBQADggEBAGi48ntm
8cn08FrsCDWapsck7a56/dyFyzLg10c0blu396tzC3ZDCAwQYzHjeXVdeWHyGO+f
KSFlmR6IA0jq6pFhUyJtgaAUJ91jW6s68GTVhlLoFhtYjy6EvhQ0lo+7GWh4qB2s
LI0mJPjaLZY1teAC4TswzwMDVD8QsB06/aFBlA65VjgZiVH+aMwWJ88gKfVGp0Pv
AApsy5MvwQn8WZ2L6foSY04OzXtmAg2gCl0PyDNgieqFDcM1g7mklHNgWl2Gvtte
G6+TiB3gGUUlTsdy0+LS2psL71RS5Jv7g/7XGmSKBPqRmYyQ2t7m2kLPwWKtL5tE
63c0FPtpV0FzKdU=
-----END CERTIFICATE-----

27
tests/fixtures/CommonName-root.key vendored Normal file
View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAxACnI84bTht4VmwFAm7nRNYDznrXk9CDZNp4DdrgjGI2EmZm
hoVvxmZNxL1e8ClboXU8q4d+hOSGo54PXdrdYjlP/I0TlPrgMq0twv0uT/7q6vFQ
VK7d1Rv+OG4cExHWnzZwFkOqI2H5itLUaqEJ9RbZt9JfCJ1qH+SZt5aDQNlnTXbw
Kfw/ujqY1HahnTJBXdDYZLkDgzJ8nATMOm7VE4+Up5qzJKu1dWmrMeAajeQuA3Im
RdKcskf00/DKEOVXIrfKx70FMs+Ni4/PtvEZA6IuKaydYrS2grdnoA6efL15WEGU
3edIpzIZwSGqSjXRqYN35Vv/ABbpdbQk6nfDXwIDAQABAoIBAA5AMebTjH6wVp6J
+g9EOwJxQROZMOVparRBgisXt+3dEitiUKAFQaw+MfdVAXsatrPVj1S1ZEiLSRLK
YjmjuSb0HdGx/DN/zh9BIiukNuLQGQp+AyY1FKHzCBfYQahNSrqGvb2Qq+UosXkb
fSBHly6/u5K28/vvXhD1kQudIOvtAc9tOg8LZnM6N3J4E0GtLqWimRZ4jNK4APu1
YsLIg87Eam+7x25+phz9xc22tZ1H4WY9FnOGprPnievqiV7mgcNGAklTB93C6yX1
EI+QxQnPg0P732C4EJZFDPqhVRA4E7BUb5uTIXCJBA/FFuRIx9ppyLZKt9vjTchM
8YWIEsECgYEA/5DRR9FkIWJZb0Pv3SCc53PMPT/xpYB6lH2lGtG+u+L71dJNDiPt
da3dPXSBy+aF7BbmRDawRvyOLGArlWiSsoEUVlES8BYzQ1MmfDf+MJooJoBE6/g6
2OyyNnPde1GqyxsxgNTITvJCTjYH64lxKVRYfMgMAASK49SjYiEgGn8CgYEAxFXs
Oe0sUcc3P1cQ9pJfSVKpSczZq/OGAxqlniqRHvoWgFfKOWB6F9PN0rd8G2aMlfGS
BjyiPe770gtpX8Z4G4lrtkJD8NvGoVC8yX78HbrXL2RA4lPjQfrveUnwXIRbRKWa
6D/GAYPOuNvJmwF4hY/orWyIqvpNczIjTjs1JyECgYEAvhuNAn6JnKfbXYBM+tIa
xbWHFXzula2IAdOhMN0bpApKSZmBxmYFa0elTuTO9M2Li77RFacU5AlU/T+gzCiZ
D34jkb4Hd18cTRWaiEbiqGbUPSennVzu8ZTJUOZJuEVc5m9ZGLuwMcHWfvWEWLrJ
2fOrS09IVe8LHkV8MC/yAKMCgYBmDUdhgK9Fvqgv60Cs+b4/rZDDBJCsOUOSP3qQ
sQ2HrXSet4MsucIcuoJEog0HbRFsKwm85i1qxdrs/fOCzfXGUnLDZMRN4N7pIL9Q
eQnxJhoNzy2Otw3sUNPDFrSyUjXig7X2PJfeV7XPDqdHQ8dynS/TXRPY04wIcao6
Uro5IQKBgFUz2GjAxI6uc7ihmRv/GYTuXYOlO0IN7MFwQDd0pVnWHkLNZscO9L9/
ALV4g1p/75CewlQfyC8ynOJJWcDeHHFNsSMsOzAxUOVtVenaF/dgwk95wpXj6Rx6
4kvQqnJg97fRBbyzvQcdL36kL8+pbmHNoqHPwxbuigYShB74d6/h
-----END RSA PRIVATE KEY-----

View File

@ -473,7 +473,7 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
}, nil
}
err := fileutil.TouchDirAll(srv.Member.BaseDir)
err := fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
if err != nil {
return nil, err
}
@ -508,7 +508,7 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
func (srv *Server) handle_RESTART_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
var err error
if !fileutil.Exist(srv.Member.BaseDir) {
err = fileutil.TouchDirAll(srv.Member.BaseDir)
err = fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
if err != nil {
return nil, err
}
@ -579,7 +579,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
// create a new log file for next new member restart
if !fileutil.Exist(srv.Member.BaseDir) {
err = fileutil.TouchDirAll(srv.Member.BaseDir)
err = fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
if err != nil {
return nil, err
}
@ -651,6 +651,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
// TODO: support separate WAL directory
if err = archive(
srv.lg,
srv.Member.BaseDir,
srv.Member.Etcd.LogOutputs[0],
srv.Member.Etcd.DataDir,

View File

@ -25,15 +25,17 @@ import (
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.uber.org/zap"
)
// TODO: support separate WAL directory
func archive(baseDir, etcdLogPath, dataDir string) error {
func archive(lg *zap.Logger, baseDir, etcdLogPath, dataDir string) error {
dir := filepath.Join(baseDir, "etcd-failure-archive", time.Now().Format(time.RFC3339))
if existDir(dir) {
dir = filepath.Join(baseDir, "etcd-failure-archive", time.Now().Add(time.Second).Format(time.RFC3339))
}
if err := fileutil.TouchDirAll(dir); err != nil {
if err := fileutil.TouchDirAll(lg, dir); err != nil {
return err
}

View File

@ -7,8 +7,8 @@ fi
(
cd ./tests
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-agent ./functional/cmd/etcd-agent
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-proxy ./functional/cmd/etcd-proxy
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-runner ./functional/cmd/etcd-runner
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-tester ./functional/cmd/etcd-tester
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-agent ./functional/cmd/etcd-agent
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-proxy ./functional/cmd/etcd-proxy
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-runner ./functional/cmd/etcd-runner
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-tester ./functional/cmd/etcd-tester
)

View File

@ -520,7 +520,7 @@ func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Respons
"fixtures",
"client",
)
if err = fileutil.TouchDirAll(dirClient); err != nil {
if err = fileutil.TouchDirAll(clus.lg, dirClient); err != nil {
return nil, err
}

View File

@ -37,7 +37,7 @@ func (clus *Cluster) Run() {
// needs to obtain all the failpoints from the etcd member.
clus.updateCases()
if err := fileutil.TouchDirAll(clus.Tester.DataDir); err != nil {
if err := fileutil.TouchDirAll(clus.lg, clus.Tester.DataDir); err != nil {
clus.lg.Panic(
"failed to create test data directory",
zap.String("dir", clus.Tester.DataDir),

View File

@ -27,14 +27,14 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/pkg/v3 v3.5.5
go.etcd.io/etcd/client/v2 v2.305.5
go.etcd.io/etcd/client/v3 v3.5.5
go.etcd.io/etcd/etcdutl/v3 v3.5.5
go.etcd.io/etcd/pkg/v3 v3.5.5
go.etcd.io/etcd/raft/v3 v3.5.5
go.etcd.io/etcd/server/v3 v3.5.5
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/pkg/v3 v3.5.6
go.etcd.io/etcd/client/v2 v2.305.6
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/etcdutl/v3 v3.5.6
go.etcd.io/etcd/pkg/v3 v3.5.6
go.etcd.io/etcd/raft/v3 v3.5.6
go.etcd.io/etcd/server/v3 v3.5.6
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View File

@ -83,8 +83,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca h1:Y2I0lxOttdUKz+hNaIdG3FtjuQrTmwXun1opRV65IZc=
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -103,6 +101,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=

View File

@ -15,9 +15,14 @@
package recipes_test
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
"go.etcd.io/etcd/tests/v3/integration"
@ -97,6 +102,67 @@ func TestDoubleBarrier(t *testing.T) {
}
}
func TestDoubleBarrierTooManyClients(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
waiters := 10
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
donec := make(chan struct{})
var (
wgDone sync.WaitGroup // make sure all clients have finished the tasks
wgEntered sync.WaitGroup // make sure all clients have entered the double barrier
)
wgDone.Add(waiters)
wgEntered.Add(waiters)
for i := 0; i < waiters; i++ {
go func() {
defer wgDone.Done()
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
if err := bb.Enter(); err != nil {
t.Errorf("could not enter on barrier (%v)", err)
}
wgEntered.Done()
<-donec
if err := bb.Leave(); err != nil {
t.Errorf("could not leave on barrier (%v)", err)
}
}()
}
// Wait until all clients have already entered the double barrier, so
// no any other client can enter the barrier.
wgEntered.Wait()
t.Log("Try to enter into double barrier")
if err := b.Enter(); err != recipe.ErrTooManyClients {
t.Errorf("Unexcepted error, expected: ErrTooManyClients, got: %v", err)
}
resp, err := clus.RandClient().Get(context.TODO(), "test-barrier/waiters", clientv3.WithPrefix())
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// Make sure the extra `b.Enter()` did not create a new ephemeral key
assert.Equal(t, waiters, len(resp.Kvs))
close(donec)
wgDone.Wait()
}
func TestDoubleBarrierFailover(t *testing.T) {
integration.BeforeTest(t)

View File

@ -340,6 +340,9 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
if !ok {
t.Fatalf("unexpected watch close")
}
if err := v.Err(); err != nil {
t.Fatalf("unexpected watch response error: %v", err)
}
if string(v.Events[0].Kv.Value) != val {
t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
}

View File

@ -136,7 +136,8 @@ type ClusterConfig struct {
DiscoveryURL string
AuthToken string
AuthToken string
AuthTokenTTL uint
UseGRPC bool
@ -314,6 +315,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
name: c.generateMemberName(),
memberNumber: memberNumber,
authToken: c.cfg.AuthToken,
authTokenTTL: c.cfg.AuthTokenTTL,
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
@ -624,6 +626,7 @@ type memberConfig struct {
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
authToken string
authTokenTTL uint
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
@ -715,6 +718,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
if mcfg.authToken != "" {
m.AuthToken = mcfg.authToken
}
if mcfg.authTokenTTL != 0 {
m.TokenTTL = mcfg.authTokenTTL
}
m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing

View File

@ -498,3 +498,91 @@ func TestV3AuthRestartMember(t *testing.T) {
_, err = c2.Put(context.TODO(), "foo", "bar2")
testutil.AssertNil(t, err)
}
func TestV3AuthWatchAndTokenExpire(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1, AuthTokenTTL: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
_, err := c.Put(ctx, "key", "val")
if err != nil {
t.Fatalf("Unexpected error from Put: %v", err)
}
// The first watch gets a valid auth token through watcher.newWatcherGrpcStream()
// We should discard the first one by waiting TTL after the first watch.
wChan := c.Watch(ctx, "key", clientv3.WithRev(1))
watchResponse := <-wChan
time.Sleep(5 * time.Second)
wChan = c.Watch(ctx, "key", clientv3.WithRev(1))
watchResponse = <-wChan
testutil.AssertNil(t, watchResponse.Err())
}
func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
users := []user{
{
name: "user1",
password: "user1-123",
role: "role1",
key: "k1",
end: "k2",
},
}
authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users)
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
watchStartCh, watchEndCh := make(chan interface{}), make(chan interface{})
go func() {
wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
watchStartCh <- struct{}{}
watchResponse := <-wChan
t.Logf("watch response from k1: %v", watchResponse)
testutil.AssertTrue(t, len(watchResponse.Events) != 0)
watchEndCh <- struct{}{}
}()
// Chan for making sure that the above goroutine invokes Watch()
// So the above Watch() can get watch ID = 0
<-watchStartCh
wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
watchResponse := <-wChan
testutil.AssertNotNil(t, watchResponse.Err()) // permission denied
_, err := c.Put(ctx, "k1", "val")
if err != nil {
t.Fatalf("Unexpected error from Put: %v", err)
}
<-watchEndCh
}

View File

@ -26,6 +26,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
)
@ -395,8 +396,8 @@ func TestV3WatchWrongRange(t *testing.T) {
if cresp.Canceled != tt.canceled {
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
}
if tt.canceled && cresp.WatchId != -1 {
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
if tt.canceled && cresp.WatchId != clientv3.InvalidWatchID {
t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, clientv3.InvalidWatchID)
}
}
}