Compare commits
46 Commits
tests/v3.5
...
pkg/v3.5.6
Author | SHA1 | Date | |
---|---|---|---|
cecbe35ce0 | |||
d0424a7bf1 | |||
1a9742c9c4 | |||
7ccca083eb | |||
c91978077b | |||
b2821631aa | |||
4097c24783 | |||
9849fa7c66 | |||
69aace20c8 | |||
5f387e6b7d | |||
563713e128 | |||
c2378be1b5 | |||
6797856841 | |||
cc6a082f9e | |||
27707209ae | |||
be4adc0c55 | |||
8902fe9246 | |||
45e31f6c80 | |||
8e26a1fff1 | |||
0a0f0e3617 | |||
bd7405a52e | |||
17cb291f15 | |||
1e96e0be38 | |||
efb9480b96 | |||
7cd9e5a338 | |||
d78f6f7f14 | |||
ec6f0a74ba | |||
62169d12eb | |||
d3da22fb1f | |||
acc7463fb2 | |||
2fb9be6f7d | |||
f6c4c84da3 | |||
3afd0735e0 | |||
e712234a1a | |||
3e195ba473 | |||
25ef9b6f46 | |||
5ff0d7fe26 | |||
dce3fdbeb1 | |||
07c7a98371 | |||
dd983c662b | |||
5daf35bb4a | |||
528dd82be9 | |||
7b568f23ab | |||
db55011d7c | |||
89d0fc49fc | |||
653d6e18c3 |
2
Makefile
2
Makefile
@ -162,7 +162,7 @@ test-full:
|
||||
PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
|
||||
|
||||
ensure-docker-test-image-exists:
|
||||
make pull-docker-test || echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test
|
||||
make pull-docker-test || ( echo "WARNING: Container Image not found in registry, building locally"; make build-docker-test )
|
||||
|
||||
docker-test: ensure-docker-test-image-exists
|
||||
$(info GO_VERSION: $(GO_VERSION))
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.5.5"
|
||||
Version = "3.5.6"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
@ -134,15 +134,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/form3tech-oss/jwt-go",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "MIT License",
|
||||
"confidence": 0.9891304347826086
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/getsentry/raven-go",
|
||||
"licenses": [
|
||||
@ -161,6 +152,15 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/golang-jwt/jwt/v4",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "MIT License",
|
||||
"confidence": 0.9891304347826086
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "github.com/golang/groupcache/lru",
|
||||
"licenses": [
|
||||
|
4
build.sh
4
build.sh
@ -42,6 +42,7 @@ etcd_build() {
|
||||
# Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK
|
||||
# shellcheck disable=SC2086
|
||||
run env "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
|
||||
-trimpath \
|
||||
-installsuffix=cgo \
|
||||
"-ldflags=${GO_LDFLAGS[*]}" \
|
||||
-o="../${out}/etcd" . || return 2
|
||||
@ -52,6 +53,7 @@ etcd_build() {
|
||||
(
|
||||
cd ./etcdutl
|
||||
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
|
||||
-trimpath \
|
||||
-installsuffix=cgo \
|
||||
"-ldflags=${GO_LDFLAGS[*]}" \
|
||||
-o="../${out}/etcdutl" . || return 2
|
||||
@ -62,6 +64,7 @@ etcd_build() {
|
||||
(
|
||||
cd ./etcdctl
|
||||
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
|
||||
-trimpath \
|
||||
-installsuffix=cgo \
|
||||
"-ldflags=${GO_LDFLAGS[*]}" \
|
||||
-o="../${out}/etcdctl" . || return 2
|
||||
@ -92,6 +95,7 @@ tools_build() {
|
||||
run rm -f "${out}/${tool}"
|
||||
# shellcheck disable=SC2086
|
||||
run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} \
|
||||
-trimpath \
|
||||
-installsuffix=cgo \
|
||||
"-ldflags='${GO_LDFLAGS[*]}'" \
|
||||
-o="${out}/${tool}" "./${tool}" || return 2
|
||||
|
@ -44,16 +44,12 @@ func IsDirWriteable(dir string) error {
|
||||
|
||||
// TouchDirAll is similar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||
// does not exists. TouchDirAll also ensures the given directory is writable.
|
||||
func TouchDirAll(dir string) error {
|
||||
func TouchDirAll(lg *zap.Logger, dir string) error {
|
||||
// If path is already a directory, MkdirAll does nothing and returns nil, so,
|
||||
// first check if dir exist with an expected permission mode.
|
||||
if Exist(dir) {
|
||||
err := CheckDirPermission(dir, PrivateDirMode)
|
||||
if err != nil {
|
||||
lg, _ := zap.NewProduction()
|
||||
if lg == nil {
|
||||
lg = zap.NewExample()
|
||||
}
|
||||
lg.Warn("check file permission", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
@ -70,8 +66,8 @@ func TouchDirAll(dir string) error {
|
||||
|
||||
// CreateDirAll is similar to TouchDirAll but returns error
|
||||
// if the deepest directory was not empty.
|
||||
func CreateDirAll(dir string) error {
|
||||
err := TouchDirAll(dir)
|
||||
func CreateDirAll(lg *zap.Logger, dir string) error {
|
||||
err := TouchDirAll(lg, dir)
|
||||
if err == nil {
|
||||
var ns []string
|
||||
ns, err = ReadDir(dir)
|
||||
|
@ -67,7 +67,7 @@ func TestCreateDirAll(t *testing.T) {
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
tmpdir2 := filepath.Join(tmpdir, "testdir")
|
||||
if err = CreateDirAll(tmpdir2); err != nil {
|
||||
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func TestCreateDirAll(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = CreateDirAll(tmpdir2); err == nil || !strings.Contains(err.Error(), "to be empty, got") {
|
||||
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err == nil || !strings.Contains(err.Error(), "to be empty, got") {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
@ -186,7 +186,7 @@ func TestDirPermission(t *testing.T) {
|
||||
|
||||
tmpdir2 := filepath.Join(tmpdir, "testpermission")
|
||||
// create a new dir with 0700
|
||||
if err = CreateDirAll(tmpdir2); err != nil {
|
||||
if err = CreateDirAll(zaptest.NewLogger(t), tmpdir2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// check dir permission with mode different than created dir
|
||||
|
@ -41,6 +41,12 @@ func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
errC := make(chan error, 1)
|
||||
lg.Info("started to purge file",
|
||||
zap.String("dir", dirname),
|
||||
zap.String("suffix", suffix),
|
||||
zap.Uint("max", max),
|
||||
zap.Duration("interval", interval))
|
||||
|
||||
go func() {
|
||||
if donec != nil {
|
||||
defer close(donec)
|
||||
@ -63,14 +69,16 @@ func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval
|
||||
f := filepath.Join(dirname, newfnames[0])
|
||||
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
|
||||
if err != nil {
|
||||
lg.Warn("failed to lock file", zap.String("path", f), zap.Error(err))
|
||||
break
|
||||
}
|
||||
if err = os.Remove(f); err != nil {
|
||||
lg.Error("failed to remove file", zap.String("path", f), zap.Error(err))
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
if err = l.Close(); err != nil {
|
||||
lg.Warn("failed to unlock/close", zap.String("path", l.Name()), zap.Error(err))
|
||||
lg.Error("failed to unlock/close", zap.String("path", l.Name()), zap.Error(err))
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
|
@ -14,7 +14,10 @@
|
||||
|
||||
package tlsutil
|
||||
|
||||
import "crypto/tls"
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// GetCipherSuite returns the corresponding cipher suite,
|
||||
// and boolean value if it is supported.
|
||||
@ -37,3 +40,17 @@ func GetCipherSuite(s string) (uint16, bool) {
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// GetCipherSuites returns list of corresponding cipher suite IDs.
|
||||
func GetCipherSuites(ss []string) ([]uint16, error) {
|
||||
cs := make([]uint16, len(ss))
|
||||
for i, s := range ss {
|
||||
var ok bool
|
||||
cs[i], ok = GetCipherSuite(s)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected TLS cipher suite %q", s)
|
||||
}
|
||||
}
|
||||
|
||||
return cs, nil
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ func SelfCert(lg *zap.Logger, dirpath string, hosts []string, selfSignedCertVali
|
||||
)
|
||||
return
|
||||
}
|
||||
err = fileutil.TouchDirAll(dirpath)
|
||||
err = fileutil.TouchDirAll(lg, dirpath)
|
||||
if err != nil {
|
||||
if info.Logger != nil {
|
||||
info.Logger.Warn(
|
||||
|
@ -5,8 +5,8 @@ go 1.16
|
||||
require (
|
||||
github.com/json-iterator/go v1.1.11
|
||||
github.com/modern-go/reflect2 v1.0.1
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
)
|
||||
|
||||
replace (
|
||||
|
@ -45,25 +45,46 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr
|
||||
// Enter waits for "count" processes to enter the barrier then returns
|
||||
func (b *DoubleBarrier) Enter() error {
|
||||
client := b.s.Client()
|
||||
|
||||
// Check the entered clients before creating the UniqueEphemeralKey,
|
||||
// fail the request if there are already too many clients.
|
||||
if resp1, err := b.enteredClients(client); err != nil {
|
||||
return err
|
||||
} else if len(resp1.Kvs) >= b.count {
|
||||
return ErrTooManyClients
|
||||
}
|
||||
|
||||
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.myKey = ek
|
||||
|
||||
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||
// Check the entered clients after creating the UniqueEphemeralKey
|
||||
resp2, err := b.enteredClients(client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp2.Kvs) >= b.count {
|
||||
lastWaiter := resp2.Kvs[b.count-1]
|
||||
if ek.rev > lastWaiter.CreateRevision {
|
||||
// delete itself now, otherwise other processes may need to wait
|
||||
// until these keys are automatically deleted when the related
|
||||
// lease expires.
|
||||
if err = b.myKey.Delete(); err != nil {
|
||||
// Nothing to do here. We have to wait for the key to be
|
||||
// deleted when the lease expires.
|
||||
}
|
||||
return ErrTooManyClients
|
||||
}
|
||||
|
||||
if len(resp.Kvs) > b.count {
|
||||
return ErrTooManyClients
|
||||
}
|
||||
|
||||
if len(resp.Kvs) == b.count {
|
||||
// unblock waiters
|
||||
_, err = client.Put(b.ctx, b.key+"/ready", "")
|
||||
return err
|
||||
if ek.rev == lastWaiter.CreateRevision {
|
||||
// TODO(ahrtr): we might need to compare ek.key and
|
||||
// string(lastWaiter.Key), they should be equal.
|
||||
// unblock all other waiters
|
||||
_, err = client.Put(b.ctx, b.key+"/ready", "")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err = WaitEvents(
|
||||
@ -74,6 +95,18 @@ func (b *DoubleBarrier) Enter() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// enteredClients gets all the entered clients, which are ordered by the
|
||||
// createRevision in ascending order.
|
||||
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
|
||||
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Leave waits for "count" processes to leave the barrier then returns
|
||||
func (b *DoubleBarrier) Leave() error {
|
||||
client := b.s.Client()
|
||||
@ -96,7 +129,7 @@ func (b *DoubleBarrier) Leave() error {
|
||||
}
|
||||
isLowest := string(lowest.Key) == b.myKey.Key()
|
||||
|
||||
if len(resp.Kvs) == 1 {
|
||||
if len(resp.Kvs) == 1 && isLowest {
|
||||
// this is the only node in the barrier; finish up
|
||||
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
|
||||
return err
|
||||
|
@ -6,8 +6,8 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
google.golang.org/grpc v1.41.0
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
|
@ -92,6 +92,7 @@ func NewMaintenance(c *Client) Maintenance {
|
||||
err = c.getToken(dctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
|
||||
}
|
||||
cancel = func() { conn.Close() }
|
||||
|
@ -74,13 +74,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
|
||||
continue
|
||||
}
|
||||
if c.shouldRefreshToken(lastErr, callOpts) {
|
||||
// clear auth token before refreshing it.
|
||||
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
|
||||
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
|
||||
// and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource.
|
||||
c.authTokenBundle.UpdateAuthToken("")
|
||||
|
||||
gterr := c.getToken(ctx)
|
||||
gterr := c.refreshToken(ctx)
|
||||
if gterr != nil {
|
||||
c.GetLogger().Warn(
|
||||
"retrying of unary invoker failed to fetch new auth token",
|
||||
@ -161,6 +155,24 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
|
||||
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
|
||||
}
|
||||
|
||||
func (c *Client) refreshToken(ctx context.Context) error {
|
||||
if c.authTokenBundle == nil {
|
||||
// c.authTokenBundle will be initialized only when
|
||||
// c.Username != "" && c.Password != "".
|
||||
//
|
||||
// When users use the TLS CommonName based authentication, the
|
||||
// authTokenBundle is always nil. But it's possible for the clients
|
||||
// to get `rpctypes.ErrAuthOldRevision` response when the clients
|
||||
// concurrently modify auth data (e.g, addUser, deleteUser etc.).
|
||||
// In this case, there is no need to refresh the token; instead the
|
||||
// clients just need to retry the operations (e.g. Put, Delete etc).
|
||||
return nil
|
||||
}
|
||||
// clear auth token before refreshing it.
|
||||
c.authTokenBundle.UpdateAuthToken("")
|
||||
return c.getToken(ctx)
|
||||
}
|
||||
|
||||
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
|
||||
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
|
||||
// a new ClientStream according to the retry policy.
|
||||
@ -259,10 +271,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
|
||||
return true, err
|
||||
}
|
||||
if s.client.shouldRefreshToken(err, s.callOpts) {
|
||||
// clear auth token to avoid failure when call getToken
|
||||
s.client.authTokenBundle.UpdateAuthToken("")
|
||||
|
||||
gterr := s.client.getToken(s.ctx)
|
||||
gterr := s.client.refreshToken(s.ctx)
|
||||
if gterr != nil {
|
||||
s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
|
||||
return false, err // return the original error for simplicity
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -37,6 +38,13 @@ const (
|
||||
EventTypePut = mvccpb.PUT
|
||||
|
||||
closeSendErrTimeout = 250 * time.Millisecond
|
||||
|
||||
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
|
||||
// user-provided ID is available. If pass, an ID will automatically be assigned.
|
||||
AutoWatchID = 0
|
||||
|
||||
// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
|
||||
InvalidWatchID = -1
|
||||
)
|
||||
|
||||
type Event mvccpb.Event
|
||||
@ -450,7 +458,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
||||
|
||||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||
// check watch ID for backward compatibility (<= v3.3)
|
||||
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
|
||||
if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
|
||||
w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
|
||||
// failed; no channel
|
||||
close(ws.recvc)
|
||||
@ -481,7 +489,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
|
||||
} else if ws.outc != nil {
|
||||
close(ws.outc)
|
||||
}
|
||||
if ws.id != -1 {
|
||||
if ws.id != InvalidWatchID {
|
||||
delete(w.substreams, ws.id)
|
||||
return
|
||||
}
|
||||
@ -533,6 +541,7 @@ func (w *watchGrpcStream) run() {
|
||||
cancelSet := make(map[int64]struct{})
|
||||
|
||||
var cur *pb.WatchResponse
|
||||
backoff := time.Millisecond
|
||||
for {
|
||||
select {
|
||||
// Watch() requested
|
||||
@ -543,7 +552,7 @@ func (w *watchGrpcStream) run() {
|
||||
// TODO: pass custom watch ID?
|
||||
ws := &watcherStream{
|
||||
initReq: *wreq,
|
||||
id: -1,
|
||||
id: InvalidWatchID,
|
||||
outc: outc,
|
||||
// unbuffered so resumes won't cause repeat events
|
||||
recvc: make(chan *WatchResponse),
|
||||
@ -580,6 +589,26 @@ func (w *watchGrpcStream) run() {
|
||||
|
||||
switch {
|
||||
case pbresp.Created:
|
||||
cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason))
|
||||
if shouldRetryWatch(cancelReasonError) {
|
||||
var newErr error
|
||||
if wc, newErr = w.newWatchClient(); newErr != nil {
|
||||
w.lg.Error("failed to create a new watch client", zap.Error(newErr))
|
||||
return
|
||||
}
|
||||
|
||||
if len(w.resuming) != 0 {
|
||||
if ws := w.resuming[0]; ws != nil {
|
||||
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||
w.lg.Debug("error when sending request", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cur = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// response to head of queue creation
|
||||
if len(w.resuming) != 0 {
|
||||
if ws := w.resuming[0]; ws != nil {
|
||||
@ -649,6 +678,7 @@ func (w *watchGrpcStream) run() {
|
||||
closeErr = err
|
||||
return
|
||||
}
|
||||
backoff = w.backoffIfUnavailable(backoff, err)
|
||||
if wc, closeErr = w.newWatchClient(); closeErr != nil {
|
||||
return
|
||||
}
|
||||
@ -669,7 +699,7 @@ func (w *watchGrpcStream) run() {
|
||||
if len(w.substreams)+len(w.resuming) == 0 {
|
||||
return
|
||||
}
|
||||
if ws.id != -1 {
|
||||
if ws.id != InvalidWatchID {
|
||||
// client is closing an established watch; close it on the server proactively instead of waiting
|
||||
// to close when the next message arrives
|
||||
cancelSet[ws.id] = struct{}{}
|
||||
@ -688,6 +718,11 @@ func (w *watchGrpcStream) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func shouldRetryWatch(cancelReasonError error) bool {
|
||||
return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) ||
|
||||
(strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0)
|
||||
}
|
||||
|
||||
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
|
||||
// streams are marked as nil in the queue since the head must wait for its inflight registration.
|
||||
func (w *watchGrpcStream) nextResume() *watcherStream {
|
||||
@ -716,9 +751,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
||||
cancelReason: pbresp.CancelReason,
|
||||
}
|
||||
|
||||
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
|
||||
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
|
||||
// indicate they should be broadcast.
|
||||
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
|
||||
if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
|
||||
return w.broadcastResponse(wr)
|
||||
}
|
||||
|
||||
@ -873,7 +908,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
w.resumec = make(chan struct{})
|
||||
w.joinSubstreams()
|
||||
for _, ws := range w.substreams {
|
||||
ws.id = -1
|
||||
ws.id = InvalidWatchID
|
||||
w.resuming = append(w.resuming, ws)
|
||||
}
|
||||
// strip out nils, if any
|
||||
@ -963,6 +998,21 @@ func (w *watchGrpcStream) joinSubstreams() {
|
||||
|
||||
var maxBackoff = 100 * time.Millisecond
|
||||
|
||||
func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
|
||||
if isUnavailableErr(w.ctx, err) {
|
||||
// retry, but backoff
|
||||
if backoff < maxBackoff {
|
||||
// 25% backoff factor
|
||||
backoff = backoff + backoff/4
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
return backoff
|
||||
}
|
||||
|
||||
// openWatchClient retries opening a watch client until success or halt.
|
||||
// manually retry in case "ws==nil && err==nil"
|
||||
// TODO: remove FailFast=false
|
||||
@ -983,17 +1033,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
|
||||
if isHaltErr(w.ctx, err) {
|
||||
return nil, v3rpc.Error(err)
|
||||
}
|
||||
if isUnavailableErr(w.ctx, err) {
|
||||
// retry, but backoff
|
||||
if backoff < maxBackoff {
|
||||
// 25% backoff factor
|
||||
backoff = backoff + backoff/4
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
backoff = w.backoffIfUnavailable(backoff, err)
|
||||
}
|
||||
return ws, nil
|
||||
}
|
||||
|
@ -16,9 +16,10 @@ package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/pkg/v3/cobrautl"
|
||||
)
|
||||
|
||||
@ -26,6 +27,7 @@ var (
|
||||
delPrefix bool
|
||||
delPrevKV bool
|
||||
delFromKey bool
|
||||
delRange bool
|
||||
)
|
||||
|
||||
// NewDelCommand returns the cobra command for "del".
|
||||
@ -39,6 +41,7 @@ func NewDelCommand() *cobra.Command {
|
||||
cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix")
|
||||
cmd.Flags().BoolVar(&delPrevKV, "prev-kv", false, "return deleted key-value pairs")
|
||||
cmd.Flags().BoolVar(&delFromKey, "from-key", false, "delete keys that are greater than or equal to the given key using byte compare")
|
||||
cmd.Flags().BoolVar(&delRange, "range", false, "delete range of keys")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -70,6 +73,9 @@ func getDelOp(args []string) (string, []clientv3.OpOption) {
|
||||
cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("too many arguments, only accept one argument when `--prefix` or `--from-key` is set"))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(args[1]))
|
||||
if !delRange {
|
||||
fmt.Fprintln(os.Stderr, "In etcd v3.6, the operation will be suspended for a few seconds to provide the user time to verify range.")
|
||||
}
|
||||
}
|
||||
|
||||
if delPrefix {
|
||||
|
@ -9,12 +9,12 @@ require (
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/urfave/cli v1.22.4
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/v2 v2.305.5
|
||||
go.etcd.io/etcd/client/v3 v3.5.5
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.5
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/v2 v2.305.6
|
||||
go.etcd.io/etcd/client/v3 v3.5.6
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.6
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.41.0
|
||||
|
@ -82,8 +82,6 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
@ -102,6 +100,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
|
@ -114,7 +114,7 @@ func HandleBackup(withV3 bool, srcDir string, destDir string, srcWAL string, des
|
||||
destWAL = datadir.ToWalDir(destDir)
|
||||
}
|
||||
|
||||
if err := fileutil.CreateDirAll(destSnap); err != nil {
|
||||
if err := fileutil.CreateDirAll(lg, destSnap); err != nil {
|
||||
lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -25,11 +25,11 @@ require (
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/spf13/cobra v1.1.3
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/v3 v3.5.5
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/raft/v3 v3.5.5
|
||||
go.etcd.io/etcd/server/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/v3 v3.5.6
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/raft/v3 v3.5.6
|
||||
go.etcd.io/etcd/server/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
)
|
||||
|
@ -78,8 +78,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
|
||||
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
@ -98,6 +96,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
|
@ -322,7 +322,7 @@ func (s *v3Manager) copyAndVerifyDB() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := fileutil.CreateDirAll(s.snapDir); err != nil {
|
||||
if err := fileutil.CreateDirAll(s.lg, s.snapDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -383,7 +383,7 @@ func (s *v3Manager) copyAndVerifyDB() error {
|
||||
//
|
||||
// TODO: This code ignores learners !!!
|
||||
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
|
||||
if err := fileutil.CreateDirAll(s.walDir); err != nil {
|
||||
if err := fileutil.CreateDirAll(s.lg, s.walDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
20
go.mod
20
go.mod
@ -20,16 +20,16 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/spf13/cobra v1.1.3
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/v2 v2.305.5
|
||||
go.etcd.io/etcd/client/v3 v3.5.5
|
||||
go.etcd.io/etcd/etcdctl/v3 v3.5.5
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.5
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/raft/v3 v3.5.5
|
||||
go.etcd.io/etcd/server/v3 v3.5.5
|
||||
go.etcd.io/etcd/tests/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/v2 v2.305.6
|
||||
go.etcd.io/etcd/client/v3 v3.5.6
|
||||
go.etcd.io/etcd/etcdctl/v3 v3.5.6
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.6
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/raft/v3 v3.5.6
|
||||
go.etcd.io/etcd/server/v3 v3.5.6
|
||||
go.etcd.io/etcd/tests/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.41.0
|
||||
|
4
go.sum
4
go.sum
@ -84,8 +84,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
||||
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
@ -104,6 +102,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
|
@ -9,7 +9,7 @@ require (
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
google.golang.org/grpc v1.41.0
|
||||
)
|
||||
|
@ -148,20 +148,31 @@ func urlsEqual(ctx context.Context, lg *zap.Logger, a []url.URL, b []url.URL) (b
|
||||
if len(a) != len(b) {
|
||||
return false, fmt.Errorf("len(%q) != len(%q)", urlsToStrings(a), urlsToStrings(b))
|
||||
}
|
||||
|
||||
sort.Sort(types.URLs(a))
|
||||
sort.Sort(types.URLs(b))
|
||||
var needResolve bool
|
||||
for i := range a {
|
||||
if !reflect.DeepEqual(a[i], b[i]) {
|
||||
needResolve = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !needResolve {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If URLs are not equal, try to resolve it and compare again.
|
||||
urls, err := resolveTCPAddrs(ctx, lg, [][]url.URL{a, b})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
preva, prevb := a, b
|
||||
a, b = urls[0], urls[1]
|
||||
sort.Sort(types.URLs(a))
|
||||
sort.Sort(types.URLs(b))
|
||||
for i := range a {
|
||||
if !reflect.DeepEqual(a[i], b[i]) {
|
||||
return false, fmt.Errorf("%q(resolved from %q) != %q(resolved from %q)",
|
||||
a[i].String(), preva[i].String(),
|
||||
b[i].String(), prevb[i].String(),
|
||||
)
|
||||
return false, fmt.Errorf("resolved urls: %q != %q", a[i].String(), b[i].String())
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
@ -174,21 +185,13 @@ func URLStringsEqual(ctx context.Context, lg *zap.Logger, a []string, b []string
|
||||
if len(a) != len(b) {
|
||||
return false, fmt.Errorf("len(%q) != len(%q)", a, b)
|
||||
}
|
||||
urlsA := make([]url.URL, 0)
|
||||
for _, str := range a {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to parse %q", str)
|
||||
}
|
||||
urlsA = append(urlsA, *u)
|
||||
urlsA, err := stringsToURLs(a)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
urlsB := make([]url.URL, 0)
|
||||
for _, str := range b {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to parse %q", str)
|
||||
}
|
||||
urlsB = append(urlsB, *u)
|
||||
urlsB, err := stringsToURLs(b)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return urlsEqual(ctx, lg, urlsA, urlsB)
|
||||
}
|
||||
@ -201,6 +204,18 @@ func urlsToStrings(us []url.URL) []string {
|
||||
return rs
|
||||
}
|
||||
|
||||
func stringsToURLs(us []string) ([]url.URL, error) {
|
||||
urls := make([]url.URL, 0, len(us))
|
||||
for _, str := range us {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse string to URL: %q", str)
|
||||
}
|
||||
urls = append(urls, *u)
|
||||
}
|
||||
return urls, nil
|
||||
}
|
||||
|
||||
func IsNetworkTimeoutError(err error) bool {
|
||||
nerr, ok := err.(net.Error)
|
||||
return ok && nerr.Timeout()
|
||||
|
@ -17,6 +17,7 @@ package netutil
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
@ -166,113 +167,133 @@ func TestURLsEqual(t *testing.T) {
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
n int
|
||||
a []url.URL
|
||||
b []url.URL
|
||||
expect bool
|
||||
err error
|
||||
}{
|
||||
{
|
||||
n: 0,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 1,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 2,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "https", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "https://10.0.10.1:2379"(resolved from "https://10.0.10.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "https://10.0.10.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 3,
|
||||
a: []url.URL{{Scheme: "https", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"https://10.0.10.1:2379"(resolved from "https://example.com:2379") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
|
||||
err: errors.New(`resolved urls: "https://10.0.10.1:2379" != "http://10.0.10.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 4,
|
||||
a: []url.URL{{Scheme: "unix", Host: "abc:2379"}},
|
||||
b: []url.URL{{Scheme: "unix", Host: "abc:2379"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 5,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 6,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 7,
|
||||
a: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 8,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://127.0.0.1:2380"`),
|
||||
},
|
||||
{
|
||||
n: 9,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2380"(resolved from "http://example.com:2380") != "http://10.0.10.1:2379"(resolved from "http://10.0.10.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://10.0.10.1:2380" != "http://10.0.10.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 10,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://10.0.0.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 11,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://10.0.0.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 12,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://127.0.0.1:2380"`),
|
||||
},
|
||||
{
|
||||
n: 13,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://127.0.0.1:2380"(resolved from "http://127.0.0.1:2380")`),
|
||||
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://127.0.0.1:2380"`),
|
||||
},
|
||||
{
|
||||
n: 14,
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://127.0.0.1:2379"(resolved from "http://127.0.0.1:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://127.0.0.1:2379" != "http://10.0.0.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 15,
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`"http://10.0.10.1:2379"(resolved from "http://example.com:2379") != "http://10.0.0.1:2379"(resolved from "http://10.0.0.1:2379")`),
|
||||
err: errors.New(`resolved urls: "http://10.0.10.1:2379" != "http://10.0.0.1:2379"`),
|
||||
},
|
||||
{
|
||||
n: 16,
|
||||
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
err: errors.New(`len(["http://10.0.0.1:2379"]) != len(["http://10.0.0.1:2379" "http://127.0.0.1:2380"])`),
|
||||
},
|
||||
{
|
||||
n: 17,
|
||||
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
n: 18,
|
||||
a: []url.URL{{Scheme: "http", Host: "second.com:2380"}, {Scheme: "http", Host: "first.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||
expect: true,
|
||||
@ -282,21 +303,43 @@ func TestURLsEqual(t *testing.T) {
|
||||
for i, test := range tests {
|
||||
result, err := urlsEqual(context.TODO(), zap.NewExample(), test.a, test.b)
|
||||
if result != test.expect {
|
||||
t.Errorf("#%d: a:%v b:%v, expected %v but %v", i, test.a, test.b, test.expect, result)
|
||||
t.Errorf("idx=%d #%d: a:%v b:%v, expected %v but %v", i, test.n, test.a, test.b, test.expect, result)
|
||||
}
|
||||
if test.err != nil {
|
||||
if err.Error() != test.err.Error() {
|
||||
t.Errorf("#%d: err expected %v but %v", i, test.err, err)
|
||||
t.Errorf("idx=%d #%d: err expected %v but %v", i, test.n, test.err, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func TestURLStringsEqual(t *testing.T) {
|
||||
result, err := URLStringsEqual(context.TODO(), zap.NewExample(), []string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
|
||||
if !result {
|
||||
t.Errorf("unexpected result %v", result)
|
||||
defer func() { resolveTCPAddr = resolveTCPAddrDefault }()
|
||||
errOnResolve := func(ctx context.Context, addr string) (*net.TCPAddr, error) {
|
||||
return nil, fmt.Errorf("unexpected attempt to resolve: %q", addr)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
cases := []struct {
|
||||
urlsA []string
|
||||
urlsB []string
|
||||
resolver func(ctx context.Context, addr string) (*net.TCPAddr, error)
|
||||
}{
|
||||
{[]string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"}, resolveTCPAddrDefault},
|
||||
{[]string{
|
||||
"http://host1:8080",
|
||||
"http://host2:8080",
|
||||
}, []string{
|
||||
"http://host1:8080",
|
||||
"http://host2:8080",
|
||||
}, errOnResolve},
|
||||
}
|
||||
for idx, c := range cases {
|
||||
t.Logf("TestURLStringsEqual, case #%d", idx)
|
||||
resolveTCPAddr = c.resolver
|
||||
result, err := URLStringsEqual(context.TODO(), zap.NewExample(), c.urlsA, c.urlsB)
|
||||
if !result {
|
||||
t.Errorf("unexpected result %v", result)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ require (
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
)
|
||||
|
||||
// Bad imports are sometimes causing attempts to pull that code.
|
||||
|
@ -87,7 +87,7 @@ function main {
|
||||
export GOARCH=${TARGET_ARCH}
|
||||
|
||||
pushd etcd >/dev/null
|
||||
GO_LDFLAGS="-s" ./build.sh
|
||||
GO_LDFLAGS="-s -w" ./build.sh
|
||||
popd >/dev/null
|
||||
|
||||
TARGET="etcd-${VER}-${GOOS}-${GOARCH}"
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
jwt "github.com/form3tech-oss/jwt-go"
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
jwt "github.com/form3tech-oss/jwt-go"
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -370,6 +370,7 @@ func (as *authStore) Recover(be backend.Backend) {
|
||||
}
|
||||
|
||||
as.setRevision(getRevision(tx))
|
||||
as.refreshRangePermCache(tx)
|
||||
|
||||
tx.Unlock()
|
||||
|
||||
|
@ -189,6 +189,30 @@ func TestRecover(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecoverWithEmptyRangePermCache(t *testing.T) {
|
||||
as, tearDown := setupAuthStore(t)
|
||||
defer as.Close()
|
||||
defer tearDown(t)
|
||||
|
||||
as.enabled = false
|
||||
as.rangePermCache = map[string]*unifiedRangePermissions{}
|
||||
as.Recover(as.be)
|
||||
|
||||
if !as.IsAuthEnabled() {
|
||||
t.Fatalf("expected auth enabled got disabled")
|
||||
}
|
||||
|
||||
if len(as.rangePermCache) != 2 {
|
||||
t.Fatalf("rangePermCache should have permission information for 2 users (\"root\" and \"foo\"), but has %d information", len(as.rangePermCache))
|
||||
}
|
||||
if _, ok := as.rangePermCache["root"]; !ok {
|
||||
t.Fatal("user \"root\" should be created by setupAuthStore() but doesn't exist in rangePermCache")
|
||||
}
|
||||
if _, ok := as.rangePermCache["foo"]; !ok {
|
||||
t.Fatal("user \"foo\" should be created by setupAuthStore() but doesn't exist in rangePermCache")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckPassword(t *testing.T) {
|
||||
as, tearDown := setupAuthStore(t)
|
||||
defer tearDown(t)
|
||||
|
@ -619,13 +619,9 @@ func updateCipherSuites(tls *transport.TLSInfo, ss []string) error {
|
||||
return fmt.Errorf("TLSInfo.CipherSuites is already specified (given %v)", ss)
|
||||
}
|
||||
if len(ss) > 0 {
|
||||
cs := make([]uint16, len(ss))
|
||||
for i, s := range ss {
|
||||
var ok bool
|
||||
cs[i], ok = tlsutil.GetCipherSuite(s)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected TLS cipher suite %q", s)
|
||||
}
|
||||
cs, err := tlsutil.GetCipherSuites(ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tls.CipherSuites = cs
|
||||
}
|
||||
|
@ -323,6 +323,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
|
||||
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
|
||||
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
|
||||
zap.Uint64("snapshot-count", sc.SnapshotCount),
|
||||
zap.Uint("max-wals", sc.MaxWALFiles),
|
||||
zap.Uint("max-snapshots", sc.MaxSnapFiles),
|
||||
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
|
||||
zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()),
|
||||
zap.Strings("listen-peer-urls", ec.getLPURLs()),
|
||||
|
@ -276,7 +276,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
|
||||
err = fileutil.TouchDirAll(cfg.ec.Dir)
|
||||
err = fileutil.TouchDirAll(lg, cfg.ec.Dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/pkg/v3/logutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/leasing"
|
||||
@ -41,12 +42,12 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
|
||||
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
|
||||
"go.uber.org/zap/zapgrpc"
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/soheilhy/cmux"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapgrpc"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -74,12 +75,13 @@ var (
|
||||
|
||||
// tls for clients connecting to proxy
|
||||
|
||||
grpcProxyListenCA string
|
||||
grpcProxyListenCert string
|
||||
grpcProxyListenKey string
|
||||
grpcProxyListenAutoTLS bool
|
||||
grpcProxyListenCRL string
|
||||
selfSignedCertValidity uint
|
||||
grpcProxyListenCA string
|
||||
grpcProxyListenCert string
|
||||
grpcProxyListenKey string
|
||||
grpcProxyListenCipherSuites []string
|
||||
grpcProxyListenAutoTLS bool
|
||||
grpcProxyListenCRL string
|
||||
selfSignedCertValidity uint
|
||||
|
||||
grpcProxyAdvertiseClientURL string
|
||||
grpcProxyResolverPrefix string
|
||||
@ -154,6 +156,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
|
||||
cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
|
||||
cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
|
||||
cmd.Flags().StringSliceVar(&grpcProxyListenCipherSuites, "listen-cipher-suites", grpcProxyListenCipherSuites, "Comma-separated list of supported TLS cipher suites between client/proxy (empty will be auto-populated by Go).")
|
||||
cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
|
||||
cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
|
||||
cmd.Flags().UintVar(&selfSignedCertValidity, "self-signed-cert-validity", 1, "The validity period of the proxy certificates, unit is year")
|
||||
@ -187,21 +190,28 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
// The proxy itself (ListenCert) can have not-empty CN.
|
||||
// The empty CN is required for grpcProxyCert.
|
||||
// Please see https://github.com/etcd-io/etcd/issues/11970#issuecomment-687875315 for more context.
|
||||
tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
|
||||
|
||||
if tlsinfo == nil && grpcProxyListenAutoTLS {
|
||||
tlsInfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey, false)
|
||||
if len(grpcProxyListenCipherSuites) > 0 {
|
||||
cs, err := tlsutil.GetCipherSuites(grpcProxyListenCipherSuites)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tlsInfo.CipherSuites = cs
|
||||
}
|
||||
if tlsInfo == nil && grpcProxyListenAutoTLS {
|
||||
host := []string{"https://" + grpcProxyListenAddr}
|
||||
dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
|
||||
autoTLS, err := transport.SelfCert(lg, dir, host, selfSignedCertValidity)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tlsinfo = &autoTLS
|
||||
tlsInfo = &autoTLS
|
||||
}
|
||||
if tlsinfo != nil {
|
||||
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo)))
|
||||
|
||||
if tlsInfo != nil {
|
||||
lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo)))
|
||||
}
|
||||
m := mustListenCMux(lg, tlsinfo)
|
||||
m := mustListenCMux(lg, tlsInfo)
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
defer func() {
|
||||
grpcl.Close()
|
||||
@ -214,11 +224,11 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
// TODO: The mechanism should be refactored to use internal connection.
|
||||
var proxyClient *clientv3.Client
|
||||
if grpcProxyAdvertiseClientURL != "" {
|
||||
proxyClient = mustNewProxyClient(lg, tlsinfo)
|
||||
proxyClient = mustNewProxyClient(lg, tlsInfo)
|
||||
}
|
||||
httpClient := mustNewHTTPClient(lg)
|
||||
|
||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
|
||||
srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient)
|
||||
|
||||
if err := http2.ConfigureServer(srvhttp, &http2.Server{
|
||||
MaxConcurrentStreams: maxConcurrentStreams,
|
||||
@ -231,7 +241,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
go func() { errc <- srvhttp.Serve(httpl) }()
|
||||
go func() { errc <- m.Serve() }()
|
||||
if len(grpcProxyMetricsListenAddr) > 0 {
|
||||
mhttpl := mustMetricsListener(lg, tlsinfo)
|
||||
mhttpl := mustMetricsListener(lg, tlsInfo)
|
||||
go func() {
|
||||
mux := http.NewServeMux()
|
||||
grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints())
|
||||
|
@ -16,6 +16,7 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
@ -24,6 +25,7 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/auth"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
@ -223,16 +225,16 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool {
|
||||
func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) error {
|
||||
authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
|
||||
if err != nil {
|
||||
return false
|
||||
return err
|
||||
}
|
||||
if authInfo == nil {
|
||||
// if auth is enabled, IsRangePermitted() can cause an error
|
||||
authInfo = &auth.AuthInfo{}
|
||||
}
|
||||
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
|
||||
return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd)
|
||||
}
|
||||
|
||||
func (sws *serverWatchStream) recvLoop() error {
|
||||
@ -266,13 +268,29 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
creq.RangeEnd = []byte{}
|
||||
}
|
||||
|
||||
if !sws.isWatchPermitted(creq) {
|
||||
err := sws.isWatchPermitted(creq)
|
||||
if err != nil {
|
||||
var cancelReason string
|
||||
switch err {
|
||||
case auth.ErrInvalidAuthToken:
|
||||
cancelReason = rpctypes.ErrGRPCInvalidAuthToken.Error()
|
||||
case auth.ErrAuthOldRevision:
|
||||
cancelReason = rpctypes.ErrGRPCAuthOldRevision.Error()
|
||||
case auth.ErrUserEmpty:
|
||||
cancelReason = rpctypes.ErrGRPCUserEmpty.Error()
|
||||
default:
|
||||
if err != auth.ErrPermissionDenied {
|
||||
sws.lg.Error("unexpected error code", zap.Error(err))
|
||||
}
|
||||
cancelReason = rpctypes.ErrGRPCPermissionDenied.Error()
|
||||
}
|
||||
|
||||
wr := &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||
WatchId: creq.WatchId,
|
||||
WatchId: clientv3.InvalidWatchID,
|
||||
Canceled: true,
|
||||
Created: true,
|
||||
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
|
||||
CancelReason: cancelReason,
|
||||
}
|
||||
|
||||
select {
|
||||
@ -303,7 +321,10 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
sws.fragment[id] = true
|
||||
}
|
||||
sws.mu.Unlock()
|
||||
} else {
|
||||
id = clientv3.InvalidWatchID
|
||||
}
|
||||
|
||||
wr := &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(wsrev),
|
||||
WatchId: int64(id),
|
||||
@ -340,7 +361,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
if uv.ProgressRequest != nil {
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
|
||||
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
|
||||
}
|
||||
}
|
||||
default:
|
||||
@ -463,7 +484,12 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
|
||||
// track id creation
|
||||
wid := mvcc.WatchID(c.WatchId)
|
||||
if c.Canceled {
|
||||
|
||||
if !(!(c.Canceled && c.Created) || wid == clientv3.InvalidWatchID) {
|
||||
panic(fmt.Sprintf("unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchID))
|
||||
}
|
||||
|
||||
if c.Canceled && wid != clientv3.InvalidWatchID {
|
||||
delete(ids, wid)
|
||||
continue
|
||||
}
|
||||
|
@ -349,13 +349,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
)
|
||||
}
|
||||
|
||||
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
||||
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
|
||||
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
||||
}
|
||||
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
|
||||
if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
|
||||
if err = fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil {
|
||||
cfg.Logger.Fatal(
|
||||
"failed to create snapshot directory",
|
||||
zap.String("path", cfg.SnapDir()),
|
||||
@ -548,7 +548,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||
}
|
||||
|
||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
|
||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||
}
|
||||
|
||||
|
@ -6,8 +6,8 @@ require (
|
||||
github.com/coreos/go-semver v0.3.0
|
||||
github.com/coreos/go-systemd/v22 v22.3.2
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/google/btree v1.0.1
|
||||
@ -25,12 +25,12 @@ require (
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
||||
go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/v2 v2.305.5
|
||||
go.etcd.io/etcd/client/v3 v3.5.5
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/raft/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/v2 v2.305.6
|
||||
go.etcd.io/etcd/client/v3 v3.5.6
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/raft/v3 v3.5.6
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0
|
||||
go.opentelemetry.io/otel v1.0.1
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1
|
||||
|
@ -80,8 +80,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
|
||||
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
@ -100,6 +98,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
|
@ -329,10 +329,6 @@ func (t *batchTxBuffered) CommitAndStop() {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) commit(stop bool) {
|
||||
if t.backend.hooks != nil {
|
||||
t.backend.hooks.OnPreCommitUnsafe(t)
|
||||
}
|
||||
|
||||
// all read txs must be closed to acquire boltdb commit rwlock
|
||||
t.backend.readTx.Lock()
|
||||
t.unsafeCommit(stop)
|
||||
@ -340,6 +336,9 @@ func (t *batchTxBuffered) commit(stop bool) {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
if t.backend.hooks != nil {
|
||||
t.backend.hooks.OnPreCommitUnsafe(t)
|
||||
}
|
||||
if t.backend.readTx.tx != nil {
|
||||
// wait all store read transactions using the current boltdb tx to finish,
|
||||
// then close the boltdb tx
|
||||
|
@ -167,6 +167,13 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
|
||||
"range failed to find revision pair",
|
||||
zap.Int64("revision-main", revpair.main),
|
||||
zap.Int64("revision-sub", revpair.sub),
|
||||
zap.Int64("revision-current", curRev),
|
||||
zap.Int64("range-option-rev", ro.Rev),
|
||||
zap.Int64("range-option-limit", ro.Limit),
|
||||
zap.Binary("key", key),
|
||||
zap.Binary("end", end),
|
||||
zap.Int("len-revpairs", len(revpairs)),
|
||||
zap.Int("len-values", len(vs)),
|
||||
)
|
||||
}
|
||||
if err := kvs[i].Unmarshal(vs[0]); err != nil {
|
||||
|
@ -20,12 +20,9 @@ import (
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
|
||||
// user-provided ID is available. If pass, an ID will automatically be assigned.
|
||||
const AutoWatchID WatchID = 0
|
||||
|
||||
var (
|
||||
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
|
||||
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
|
||||
@ -118,7 +115,7 @@ func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ..
|
||||
return -1, ErrEmptyWatcherRange
|
||||
}
|
||||
|
||||
if id == AutoWatchID {
|
||||
if id == clientv3.AutoWatchID {
|
||||
for ws.watchers[ws.nextID] != nil {
|
||||
ws.nextID++
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ func (wps *watchProxyStream) recvLoop() error {
|
||||
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
|
||||
wps.watchCh <- &pb.WatchResponse{
|
||||
Header: &pb.ResponseHeader{},
|
||||
WatchId: -1,
|
||||
WatchId: clientv3.InvalidWatchID,
|
||||
Created: true,
|
||||
Canceled: true,
|
||||
CancelReason: err.Error(),
|
||||
@ -258,7 +258,7 @@ func (wps *watchProxyStream) recvLoop() error {
|
||||
filters: v3rpc.FiltersFromRequest(cr),
|
||||
}
|
||||
if !w.wr.valid() {
|
||||
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
|
||||
w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
|
||||
wps.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
||||
}
|
||||
defer os.RemoveAll(tmpdirpath)
|
||||
|
||||
if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
|
||||
if err := fileutil.CreateDirAll(lg, tmpdirpath); err != nil {
|
||||
lg.Warn(
|
||||
"failed to create a temporary WAL directory",
|
||||
zap.String("tmp-dir-path", tmpdirpath),
|
||||
|
@ -1,4 +1,4 @@
|
||||
FROM ubuntu:20.10
|
||||
FROM ubuntu:22.04
|
||||
|
||||
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
|
||||
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// These tests depends on certificate-based authentication that is NOT supported
|
||||
// These tests depend on certificate-based authentication that is NOT supported
|
||||
// by gRPC proxy.
|
||||
//go:build !cluster_proxy
|
||||
// +build !cluster_proxy
|
||||
@ -20,7 +20,10 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCtlV3AuthCertCN(t *testing.T) {
|
||||
@ -32,3 +35,106 @@ func TestCtlV3AuthCertCNAndUsername(t *testing.T) {
|
||||
func TestCtlV3AuthCertCNAndUsernameNoPassword(t *testing.T) {
|
||||
testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(*newConfigClientTLSCertAuth()))
|
||||
}
|
||||
|
||||
func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
|
||||
// apply the certificate which has `root` CommonName,
|
||||
// and reset the setting when the test case finishes.
|
||||
// TODO(ahrtr): enhance the e2e test framework to support
|
||||
// certificates with CommonName.
|
||||
t.Log("Apply certificate with root CommonName")
|
||||
resetCert := applyTLSWithRootCommonName()
|
||||
defer resetCert()
|
||||
|
||||
t.Log("Create an etcd cluster")
|
||||
cx := getDefaultCtlCtx(t)
|
||||
cx.cfg = etcdProcessClusterConfig{
|
||||
clusterSize: 1,
|
||||
clientTLS: clientTLS,
|
||||
clientCertAuthEnabled: true,
|
||||
initialToken: "new",
|
||||
}
|
||||
|
||||
epc, err := newEtcdProcessCluster(t, &cx.cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start etcd cluster: %v", err)
|
||||
}
|
||||
cx.epc = epc
|
||||
cx.dataDir = epc.procs[0].Config().dataDirPath
|
||||
|
||||
defer func() {
|
||||
if err := epc.Close(); err != nil {
|
||||
t.Fatalf("could not close test cluster (%v)", err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Log("Enable auth")
|
||||
authEnableTest(cx)
|
||||
|
||||
// Create two goroutines, one goroutine keeps creating & deleting users,
|
||||
// and the other goroutine keeps writing & deleting K/V entries.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
errs := make(chan error, 2)
|
||||
donec := make(chan struct{})
|
||||
|
||||
// Create the first goroutine to create & delete users
|
||||
t.Log("Create the first goroutine to create & delete users")
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
user := fmt.Sprintf("testuser-%d", i)
|
||||
pass := fmt.Sprintf("testpass-%d", i)
|
||||
|
||||
if err := ctlV3User(cx, []string{"add", user, "--interactive=false"}, fmt.Sprintf("User %s created", user), []string{pass}); err != nil {
|
||||
errs <- fmt.Errorf("failed to create user %q: %w", user, err)
|
||||
break
|
||||
}
|
||||
|
||||
err := ctlV3User(cx, []string{"delete", user}, fmt.Sprintf("User %s deleted", user), []string{})
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("failed to delete user %q: %w", user, err)
|
||||
break
|
||||
}
|
||||
}
|
||||
t.Log("The first goroutine finished")
|
||||
}()
|
||||
|
||||
// Create the second goroutine to write & delete K/V entries
|
||||
t.Log("Create the second goroutine to write & delete K/V entries")
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
key := fmt.Sprintf("key-%d", i)
|
||||
value := fmt.Sprintf("value-%d", i)
|
||||
|
||||
if err := ctlV3Put(cx, key, value, ""); err != nil {
|
||||
errs <- fmt.Errorf("failed to put key %q: %w", key, err)
|
||||
break
|
||||
}
|
||||
|
||||
if err := ctlV3Del(cx, []string{key}, 1); err != nil {
|
||||
errs <- fmt.Errorf("failed to delete key %q: %w", key, err)
|
||||
break
|
||||
}
|
||||
}
|
||||
t.Log("The second goroutine finished")
|
||||
}()
|
||||
|
||||
t.Log("Waiting for the two goroutines to complete")
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(donec)
|
||||
}()
|
||||
|
||||
t.Log("Waiting for test result")
|
||||
select {
|
||||
case err := <-errs:
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
case <-donec:
|
||||
t.Log("All done!")
|
||||
case <-time.After(60 * time.Second):
|
||||
t.Fatal("Test case timeout after 60 seconds")
|
||||
}
|
||||
}
|
||||
|
@ -17,11 +17,16 @@ package e2e
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
@ -72,6 +77,10 @@ func TestCtlV3AuthJWTExpire(t *testing.T) { testCtl(t, authTestJWTExpi
|
||||
func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) }
|
||||
func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) }
|
||||
|
||||
func TestCtlV3AuthRecoverFromSnapshot(t *testing.T) {
|
||||
testCtl(t, authTestRecoverSnapshot, withCfg(*newConfigNoTLS()), withQuorum(), withSnapshotCount(5))
|
||||
}
|
||||
|
||||
func authEnableTest(cx ctlCtx) {
|
||||
if err := authEnable(cx); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
@ -92,6 +101,28 @@ func authEnable(cx ctlCtx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func applyTLSWithRootCommonName() func() {
|
||||
var (
|
||||
oldCertPath = certPath
|
||||
oldPrivateKeyPath = privateKeyPath
|
||||
oldCaPath = caPath
|
||||
|
||||
newCertPath = filepath.Join(fixturesDir, "CommonName-root.crt")
|
||||
newPrivateKeyPath = filepath.Join(fixturesDir, "CommonName-root.key")
|
||||
newCaPath = filepath.Join(fixturesDir, "CommonName-root.crt")
|
||||
)
|
||||
|
||||
certPath = newCertPath
|
||||
privateKeyPath = newPrivateKeyPath
|
||||
caPath = newCaPath
|
||||
|
||||
return func() {
|
||||
certPath = oldCertPath
|
||||
privateKeyPath = oldPrivateKeyPath
|
||||
caPath = oldCaPath
|
||||
}
|
||||
}
|
||||
|
||||
func ctlV3AuthEnable(cx ctlCtx) error {
|
||||
cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
|
||||
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
|
||||
@ -1289,3 +1320,192 @@ func authTestCacheReload(cx ctlCtx) {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that etcd works after recovering from a snapshot.
|
||||
// Refer to https://github.com/etcd-io/etcd/issues/14571.
|
||||
func authTestRecoverSnapshot(cx ctlCtx) {
|
||||
roles := []authRole{
|
||||
{
|
||||
role: "role0",
|
||||
permission: clientv3.PermissionType(clientv3.PermReadWrite),
|
||||
key: "foo",
|
||||
},
|
||||
}
|
||||
|
||||
users := []authUser{
|
||||
{
|
||||
user: "root",
|
||||
pass: "rootPass",
|
||||
role: "root",
|
||||
},
|
||||
{
|
||||
user: "user0",
|
||||
pass: "user0Pass",
|
||||
role: "role0",
|
||||
},
|
||||
}
|
||||
|
||||
cx.t.Log("setup and enable auth")
|
||||
setupAuth(cx, roles, users)
|
||||
|
||||
// create a client with root user
|
||||
cx.t.Log("create a client with root user")
|
||||
cliRoot, err := clientv3.New(clientv3.Config{Endpoints: cx.epc.EndpointsV3(), Username: "root", Password: "rootPass", DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer cliRoot.Close()
|
||||
|
||||
// write more than SnapshotCount keys, so that at least one snapshot is created
|
||||
cx.t.Log("Write enough key/value to trigger a snapshot")
|
||||
for i := 0; i <= 6; i++ {
|
||||
if _, err := cliRoot.Put(context.TODO(), fmt.Sprintf("key_%d", i), fmt.Sprintf("value_%d", i)); err != nil {
|
||||
cx.t.Fatalf("failed to Put (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// add a new member into the cluster
|
||||
// Refer to https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/tests/e2e/cluster_test.go#L238
|
||||
var (
|
||||
idx = 3
|
||||
name = fmt.Sprintf("test-%d", idx)
|
||||
port = cx.cfg.basePort + 5*idx
|
||||
curlHost = fmt.Sprintf("localhost:%d", port)
|
||||
nodeClientURL = url.URL{Scheme: cx.cfg.clientScheme(), Host: curlHost}
|
||||
nodePeerURL = url.URL{Scheme: cx.cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
||||
initialCluster = cx.epc.procs[0].Config().initialCluster + "," + fmt.Sprintf("%s=%s", name, nodePeerURL.String())
|
||||
)
|
||||
cx.t.Logf("Adding a new member: %s", nodePeerURL.String())
|
||||
// Must wait at least 5 seconds, otherwise it will always get an
|
||||
// "etcdserver: unhealthy cluster" response, please refer to link below,
|
||||
// https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/server/etcdserver/server.go#L1611
|
||||
assert.Eventually(cx.t, func() bool {
|
||||
if _, err := cliRoot.MemberAdd(context.TODO(), []string{nodePeerURL.String()}); err != nil {
|
||||
cx.t.Logf("Failed to add member, peelURL: %s, error: %v", nodePeerURL.String(), err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}, 8*time.Second, 2*time.Second)
|
||||
|
||||
cx.t.Logf("Starting the new member: %s", nodePeerURL.String())
|
||||
newProc, err := runEtcdNode(name, cx.t.TempDir(), nodeClientURL.String(), nodePeerURL.String(), "existing", initialCluster)
|
||||
require.NoError(cx.t, err)
|
||||
defer newProc.Stop()
|
||||
|
||||
// create a client with user "user0", and connects to the new member
|
||||
cx.t.Log("create a client with user 'user0'")
|
||||
cliUser, err := clientv3.New(clientv3.Config{Endpoints: []string{nodeClientURL.String()}, Username: "user0", Password: "user0Pass", DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer cliUser.Close()
|
||||
|
||||
// write data using the cliUser, expect no error
|
||||
cx.t.Log("Write a key/value using user 'user0'")
|
||||
_, err = cliUser.Put(context.TODO(), "foo", "bar")
|
||||
require.NoError(cx.t, err)
|
||||
|
||||
//verify all nodes have the same revision and hash
|
||||
var endpoints []string
|
||||
for _, proc := range cx.epc.procs {
|
||||
endpoints = append(endpoints, proc.Config().acurl)
|
||||
}
|
||||
endpoints = append(endpoints, nodeClientURL.String())
|
||||
cx.t.Log("Verify all members have the same revision and hash")
|
||||
assert.Eventually(cx.t, func() bool {
|
||||
hashKvs, err := hashKVs(endpoints, cliRoot)
|
||||
if err != nil {
|
||||
cx.t.Logf("failed to get HashKV: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if len(hashKvs) != 4 {
|
||||
cx.t.Logf("expected 4 hashkv responses, but got: %d", len(hashKvs))
|
||||
return false
|
||||
}
|
||||
|
||||
if !(hashKvs[0].Header.Revision == hashKvs[1].Header.Revision &&
|
||||
hashKvs[0].Header.Revision == hashKvs[2].Header.Revision &&
|
||||
hashKvs[0].Header.Revision == hashKvs[3].Header.Revision) {
|
||||
cx.t.Logf("Got different revisions, [%d, %d, %d, %d]",
|
||||
hashKvs[0].Header.Revision,
|
||||
hashKvs[1].Header.Revision,
|
||||
hashKvs[2].Header.Revision,
|
||||
hashKvs[3].Header.Revision)
|
||||
return false
|
||||
}
|
||||
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[1].Hash)
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[2].Hash)
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[3].Hash)
|
||||
|
||||
return true
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
type authRole struct {
|
||||
role string
|
||||
permission clientv3.PermissionType
|
||||
key string
|
||||
keyEnd string
|
||||
}
|
||||
|
||||
type authUser struct {
|
||||
user string
|
||||
pass string
|
||||
role string
|
||||
}
|
||||
|
||||
func setupAuth(cx ctlCtx, roles []authRole, users []authUser) {
|
||||
endpoint := cx.epc.procs[0].EndpointsV3()[0]
|
||||
|
||||
// create a client
|
||||
c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// create roles
|
||||
for _, r := range roles {
|
||||
// add role
|
||||
if _, err = c.RoleAdd(context.TODO(), r.role); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
// grant permission to role
|
||||
if _, err = c.RoleGrantPermission(context.TODO(), r.role, r.key, r.keyEnd, r.permission); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// create users
|
||||
for _, u := range users {
|
||||
// add user
|
||||
if _, err = c.UserAdd(context.TODO(), u.user, u.pass); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
// grant role to user
|
||||
if _, err = c.UserGrantRole(context.TODO(), u.user, u.role); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// enable auth
|
||||
if _, err = c.AuthEnable(context.TODO()); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func hashKVs(endpoints []string, cli *clientv3.Client) ([]*clientv3.HashKVResponse, error) {
|
||||
var retHashKVs []*clientv3.HashKVResponse
|
||||
for _, ep := range endpoints {
|
||||
resp, err := cli.HashKV(context.TODO(), ep, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retHashKVs = append(retHashKVs, resp)
|
||||
}
|
||||
return retHashKVs, nil
|
||||
}
|
||||
|
@ -221,6 +221,14 @@ func withMaxConcurrentStreams(streams uint32) ctlOption {
|
||||
}
|
||||
}
|
||||
|
||||
// This function must be called after the `withCfg`, otherwise its value
|
||||
// may be overwritten by `withCfg`.
|
||||
func withSnapshotCount(snapshotCount int) ctlOption {
|
||||
return func(cx *ctlCtx) {
|
||||
cx.cfg.snapshotCount = snapshotCount
|
||||
}
|
||||
}
|
||||
|
||||
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
||||
testCtlWithOffline(t, testFunc, nil, opts...)
|
||||
}
|
||||
|
@ -320,6 +320,46 @@ func TestGrpcproxyAndCommonName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGrpcproxyAndListenCipherSuite(t *testing.T) {
|
||||
skipInShortMode(t)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
args []string
|
||||
}{
|
||||
{
|
||||
name: "ArgsWithCipherSuites",
|
||||
args: []string{
|
||||
binDir + "/etcd",
|
||||
"grpc-proxy",
|
||||
"start",
|
||||
"--listen-cipher-suites", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ArgsWithoutCipherSuites",
|
||||
args: []string{
|
||||
binDir + "/etcd",
|
||||
"grpc-proxy",
|
||||
"start",
|
||||
"--listen-cipher-suites", "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range cases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pw, err := spawnCmd(test.args, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = pw.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBootstrapDefragFlag(t *testing.T) {
|
||||
skipInShortMode(t)
|
||||
|
||||
|
29
tests/fixtures/CommonName-root.crt
vendored
Normal file
29
tests/fixtures/CommonName-root.crt
vendored
Normal file
@ -0,0 +1,29 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIE5zCCA8+gAwIBAgIJAKooGDZuR2mMMA0GCSqGSIb3DQEBCwUAMH8xCzAJBgNV
|
||||
BAYTAkNOMRAwDgYDVQQIDAdCZWlqaW5nMRAwDgYDVQQHDAdCZWlqaW5nMQ0wCwYD
|
||||
VQQKDAREZW1vMQ0wCwYDVQQLDAREZW1vMQ0wCwYDVQQDDARyb290MR8wHQYJKoZI
|
||||
hvcNAQkBFhB0ZXN0QGV4YW1wbGUuY29tMB4XDTIyMTExNjA2NTI1M1oXDTMyMTEx
|
||||
MzA2NTI1M1owfzELMAkGA1UEBhMCQ04xEDAOBgNVBAgMB0JlaWppbmcxEDAOBgNV
|
||||
BAcMB0JlaWppbmcxDTALBgNVBAoMBERlbW8xDTALBgNVBAsMBERlbW8xDTALBgNV
|
||||
BAMMBHJvb3QxHzAdBgkqhkiG9w0BCQEWEHRlc3RAZXhhbXBsZS5jb20wggEiMA0G
|
||||
CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEAKcjzhtOG3hWbAUCbudE1gPOeteT
|
||||
0INk2ngN2uCMYjYSZmaGhW/GZk3EvV7wKVuhdTyrh36E5Iajng9d2t1iOU/8jROU
|
||||
+uAyrS3C/S5P/urq8VBUrt3VG/44bhwTEdafNnAWQ6ojYfmK0tRqoQn1Ftm30l8I
|
||||
nWof5Jm3loNA2WdNdvAp/D+6OpjUdqGdMkFd0NhkuQODMnycBMw6btUTj5SnmrMk
|
||||
q7V1aasx4BqN5C4DciZF0pyyR/TT8MoQ5Vcit8rHvQUyz42Lj8+28RkDoi4prJ1i
|
||||
tLaCt2egDp58vXlYQZTd50inMhnBIapKNdGpg3flW/8AFul1tCTqd8NfAgMBAAGj
|
||||
ggFkMIIBYDAdBgNVHQ4EFgQUpwwvEqXjA/ArJu1Jnpw7+/sttOAwgbMGA1UdIwSB
|
||||
qzCBqIAUpwwvEqXjA/ArJu1Jnpw7+/sttOChgYSkgYEwfzELMAkGA1UEBhMCQ04x
|
||||
EDAOBgNVBAgMB0JlaWppbmcxEDAOBgNVBAcMB0JlaWppbmcxDTALBgNVBAoMBERl
|
||||
bW8xDTALBgNVBAsMBERlbW8xDTALBgNVBAMMBHJvb3QxHzAdBgkqhkiG9w0BCQEW
|
||||
EHRlc3RAZXhhbXBsZS5jb22CCQCqKBg2bkdpjDAMBgNVHRMEBTADAQH/MAsGA1Ud
|
||||
DwQEAwIC/DA2BgNVHREELzAtggtleGFtcGxlLmNvbYINKi5leGFtcGxlLmNvbYIJ
|
||||
bG9jYWxob3N0hwR/AAABMDYGA1UdEgQvMC2CC2V4YW1wbGUuY29tgg0qLmV4YW1w
|
||||
bGUuY29tgglsb2NhbGhvc3SHBH8AAAEwDQYJKoZIhvcNAQELBQADggEBAGi48ntm
|
||||
8cn08FrsCDWapsck7a56/dyFyzLg10c0blu396tzC3ZDCAwQYzHjeXVdeWHyGO+f
|
||||
KSFlmR6IA0jq6pFhUyJtgaAUJ91jW6s68GTVhlLoFhtYjy6EvhQ0lo+7GWh4qB2s
|
||||
LI0mJPjaLZY1teAC4TswzwMDVD8QsB06/aFBlA65VjgZiVH+aMwWJ88gKfVGp0Pv
|
||||
AApsy5MvwQn8WZ2L6foSY04OzXtmAg2gCl0PyDNgieqFDcM1g7mklHNgWl2Gvtte
|
||||
G6+TiB3gGUUlTsdy0+LS2psL71RS5Jv7g/7XGmSKBPqRmYyQ2t7m2kLPwWKtL5tE
|
||||
63c0FPtpV0FzKdU=
|
||||
-----END CERTIFICATE-----
|
27
tests/fixtures/CommonName-root.key
vendored
Normal file
27
tests/fixtures/CommonName-root.key
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEowIBAAKCAQEAxACnI84bTht4VmwFAm7nRNYDznrXk9CDZNp4DdrgjGI2EmZm
|
||||
hoVvxmZNxL1e8ClboXU8q4d+hOSGo54PXdrdYjlP/I0TlPrgMq0twv0uT/7q6vFQ
|
||||
VK7d1Rv+OG4cExHWnzZwFkOqI2H5itLUaqEJ9RbZt9JfCJ1qH+SZt5aDQNlnTXbw
|
||||
Kfw/ujqY1HahnTJBXdDYZLkDgzJ8nATMOm7VE4+Up5qzJKu1dWmrMeAajeQuA3Im
|
||||
RdKcskf00/DKEOVXIrfKx70FMs+Ni4/PtvEZA6IuKaydYrS2grdnoA6efL15WEGU
|
||||
3edIpzIZwSGqSjXRqYN35Vv/ABbpdbQk6nfDXwIDAQABAoIBAA5AMebTjH6wVp6J
|
||||
+g9EOwJxQROZMOVparRBgisXt+3dEitiUKAFQaw+MfdVAXsatrPVj1S1ZEiLSRLK
|
||||
YjmjuSb0HdGx/DN/zh9BIiukNuLQGQp+AyY1FKHzCBfYQahNSrqGvb2Qq+UosXkb
|
||||
fSBHly6/u5K28/vvXhD1kQudIOvtAc9tOg8LZnM6N3J4E0GtLqWimRZ4jNK4APu1
|
||||
YsLIg87Eam+7x25+phz9xc22tZ1H4WY9FnOGprPnievqiV7mgcNGAklTB93C6yX1
|
||||
EI+QxQnPg0P732C4EJZFDPqhVRA4E7BUb5uTIXCJBA/FFuRIx9ppyLZKt9vjTchM
|
||||
8YWIEsECgYEA/5DRR9FkIWJZb0Pv3SCc53PMPT/xpYB6lH2lGtG+u+L71dJNDiPt
|
||||
da3dPXSBy+aF7BbmRDawRvyOLGArlWiSsoEUVlES8BYzQ1MmfDf+MJooJoBE6/g6
|
||||
2OyyNnPde1GqyxsxgNTITvJCTjYH64lxKVRYfMgMAASK49SjYiEgGn8CgYEAxFXs
|
||||
Oe0sUcc3P1cQ9pJfSVKpSczZq/OGAxqlniqRHvoWgFfKOWB6F9PN0rd8G2aMlfGS
|
||||
BjyiPe770gtpX8Z4G4lrtkJD8NvGoVC8yX78HbrXL2RA4lPjQfrveUnwXIRbRKWa
|
||||
6D/GAYPOuNvJmwF4hY/orWyIqvpNczIjTjs1JyECgYEAvhuNAn6JnKfbXYBM+tIa
|
||||
xbWHFXzula2IAdOhMN0bpApKSZmBxmYFa0elTuTO9M2Li77RFacU5AlU/T+gzCiZ
|
||||
D34jkb4Hd18cTRWaiEbiqGbUPSennVzu8ZTJUOZJuEVc5m9ZGLuwMcHWfvWEWLrJ
|
||||
2fOrS09IVe8LHkV8MC/yAKMCgYBmDUdhgK9Fvqgv60Cs+b4/rZDDBJCsOUOSP3qQ
|
||||
sQ2HrXSet4MsucIcuoJEog0HbRFsKwm85i1qxdrs/fOCzfXGUnLDZMRN4N7pIL9Q
|
||||
eQnxJhoNzy2Otw3sUNPDFrSyUjXig7X2PJfeV7XPDqdHQ8dynS/TXRPY04wIcao6
|
||||
Uro5IQKBgFUz2GjAxI6uc7ihmRv/GYTuXYOlO0IN7MFwQDd0pVnWHkLNZscO9L9/
|
||||
ALV4g1p/75CewlQfyC8ynOJJWcDeHHFNsSMsOzAxUOVtVenaF/dgwk95wpXj6Rx6
|
||||
4kvQqnJg97fRBbyzvQcdL36kL8+pbmHNoqHPwxbuigYShB74d6/h
|
||||
-----END RSA PRIVATE KEY-----
|
@ -473,7 +473,7 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
|
||||
}, nil
|
||||
}
|
||||
|
||||
err := fileutil.TouchDirAll(srv.Member.BaseDir)
|
||||
err := fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -508,7 +508,7 @@ func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Respons
|
||||
func (srv *Server) handle_RESTART_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
|
||||
var err error
|
||||
if !fileutil.Exist(srv.Member.BaseDir) {
|
||||
err = fileutil.TouchDirAll(srv.Member.BaseDir)
|
||||
err = fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -579,7 +579,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
|
||||
|
||||
// create a new log file for next new member restart
|
||||
if !fileutil.Exist(srv.Member.BaseDir) {
|
||||
err = fileutil.TouchDirAll(srv.Member.BaseDir)
|
||||
err = fileutil.TouchDirAll(srv.lg, srv.Member.BaseDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -651,6 +651,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
|
||||
|
||||
// TODO: support separate WAL directory
|
||||
if err = archive(
|
||||
srv.lg,
|
||||
srv.Member.BaseDir,
|
||||
srv.Member.Etcd.LogOutputs[0],
|
||||
srv.Member.Etcd.DataDir,
|
||||
|
@ -25,15 +25,17 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TODO: support separate WAL directory
|
||||
func archive(baseDir, etcdLogPath, dataDir string) error {
|
||||
func archive(lg *zap.Logger, baseDir, etcdLogPath, dataDir string) error {
|
||||
dir := filepath.Join(baseDir, "etcd-failure-archive", time.Now().Format(time.RFC3339))
|
||||
if existDir(dir) {
|
||||
dir = filepath.Join(baseDir, "etcd-failure-archive", time.Now().Add(time.Second).Format(time.RFC3339))
|
||||
}
|
||||
if err := fileutil.TouchDirAll(dir); err != nil {
|
||||
if err := fileutil.TouchDirAll(lg, dir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -7,8 +7,8 @@ fi
|
||||
|
||||
(
|
||||
cd ./tests
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-agent ./functional/cmd/etcd-agent
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-proxy ./functional/cmd/etcd-proxy
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-runner ./functional/cmd/etcd-runner
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ../bin/etcd-tester ./functional/cmd/etcd-tester
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-agent ./functional/cmd/etcd-agent
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-proxy ./functional/cmd/etcd-proxy
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-runner ./functional/cmd/etcd-runner
|
||||
CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s -w" -o ../bin/etcd-tester ./functional/cmd/etcd-tester
|
||||
)
|
||||
|
@ -520,7 +520,7 @@ func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Respons
|
||||
"fixtures",
|
||||
"client",
|
||||
)
|
||||
if err = fileutil.TouchDirAll(dirClient); err != nil {
|
||||
if err = fileutil.TouchDirAll(clus.lg, dirClient); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@ func (clus *Cluster) Run() {
|
||||
// needs to obtain all the failpoints from the etcd member.
|
||||
clus.updateCases()
|
||||
|
||||
if err := fileutil.TouchDirAll(clus.Tester.DataDir); err != nil {
|
||||
if err := fileutil.TouchDirAll(clus.lg, clus.Tester.DataDir); err != nil {
|
||||
clus.lg.Panic(
|
||||
"failed to create test data directory",
|
||||
zap.String("dir", clus.Tester.DataDir),
|
||||
|
16
tests/go.mod
16
tests/go.mod
@ -27,14 +27,14 @@ require (
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.etcd.io/etcd/api/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/client/v2 v2.305.5
|
||||
go.etcd.io/etcd/client/v3 v3.5.5
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.5
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.5
|
||||
go.etcd.io/etcd/raft/v3 v3.5.5
|
||||
go.etcd.io/etcd/server/v3 v3.5.5
|
||||
go.etcd.io/etcd/api/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/client/v2 v2.305.6
|
||||
go.etcd.io/etcd/client/v3 v3.5.6
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.6
|
||||
go.etcd.io/etcd/pkg/v3 v3.5.6
|
||||
go.etcd.io/etcd/raft/v3 v3.5.6
|
||||
go.etcd.io/etcd/server/v3 v3.5.6
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
|
@ -83,8 +83,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca h1:Y2I0lxOttdUKz+hNaIdG3FtjuQrTmwXun1opRV65IZc=
|
||||
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
|
||||
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
|
||||
@ -103,6 +101,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
|
@ -15,9 +15,14 @@
|
||||
package recipes_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
@ -97,6 +102,67 @@ func TestDoubleBarrier(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleBarrierTooManyClients(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
waiters := 10
|
||||
session, err := concurrency.NewSession(clus.RandClient())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer session.Orphan()
|
||||
|
||||
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
|
||||
donec := make(chan struct{})
|
||||
var (
|
||||
wgDone sync.WaitGroup // make sure all clients have finished the tasks
|
||||
wgEntered sync.WaitGroup // make sure all clients have entered the double barrier
|
||||
)
|
||||
wgDone.Add(waiters)
|
||||
wgEntered.Add(waiters)
|
||||
for i := 0; i < waiters; i++ {
|
||||
go func() {
|
||||
defer wgDone.Done()
|
||||
session, err := concurrency.NewSession(clus.RandClient())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer session.Orphan()
|
||||
|
||||
bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
|
||||
if err := bb.Enter(); err != nil {
|
||||
t.Errorf("could not enter on barrier (%v)", err)
|
||||
}
|
||||
wgEntered.Done()
|
||||
<-donec
|
||||
if err := bb.Leave(); err != nil {
|
||||
t.Errorf("could not leave on barrier (%v)", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait until all clients have already entered the double barrier, so
|
||||
// no any other client can enter the barrier.
|
||||
wgEntered.Wait()
|
||||
t.Log("Try to enter into double barrier")
|
||||
if err := b.Enter(); err != recipe.ErrTooManyClients {
|
||||
t.Errorf("Unexcepted error, expected: ErrTooManyClients, got: %v", err)
|
||||
}
|
||||
|
||||
resp, err := clus.RandClient().Get(context.TODO(), "test-barrier/waiters", clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
// Make sure the extra `b.Enter()` did not create a new ephemeral key
|
||||
assert.Equal(t, waiters, len(resp.Kvs))
|
||||
close(donec)
|
||||
|
||||
wgDone.Wait()
|
||||
}
|
||||
|
||||
func TestDoubleBarrierFailover(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
||||
|
@ -340,6 +340,9 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
|
||||
if !ok {
|
||||
t.Fatalf("unexpected watch close")
|
||||
}
|
||||
if err := v.Err(); err != nil {
|
||||
t.Fatalf("unexpected watch response error: %v", err)
|
||||
}
|
||||
if string(v.Events[0].Kv.Value) != val {
|
||||
t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
|
||||
}
|
||||
|
@ -136,7 +136,8 @@ type ClusterConfig struct {
|
||||
|
||||
DiscoveryURL string
|
||||
|
||||
AuthToken string
|
||||
AuthToken string
|
||||
AuthTokenTTL uint
|
||||
|
||||
UseGRPC bool
|
||||
|
||||
@ -314,6 +315,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
|
||||
name: c.generateMemberName(),
|
||||
memberNumber: memberNumber,
|
||||
authToken: c.cfg.AuthToken,
|
||||
authTokenTTL: c.cfg.AuthTokenTTL,
|
||||
peerTLS: c.cfg.PeerTLS,
|
||||
clientTLS: c.cfg.ClientTLS,
|
||||
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
||||
@ -624,6 +626,7 @@ type memberConfig struct {
|
||||
peerTLS *transport.TLSInfo
|
||||
clientTLS *transport.TLSInfo
|
||||
authToken string
|
||||
authTokenTTL uint
|
||||
quotaBackendBytes int64
|
||||
maxTxnOps uint
|
||||
maxRequestBytes uint
|
||||
@ -715,6 +718,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
||||
if mcfg.authToken != "" {
|
||||
m.AuthToken = mcfg.authToken
|
||||
}
|
||||
if mcfg.authTokenTTL != 0 {
|
||||
m.TokenTTL = mcfg.authTokenTTL
|
||||
}
|
||||
|
||||
m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
|
||||
|
||||
|
@ -498,3 +498,91 @@ func TestV3AuthRestartMember(t *testing.T) {
|
||||
_, err = c2.Put(context.TODO(), "foo", "bar2")
|
||||
testutil.AssertNil(t, err)
|
||||
}
|
||||
|
||||
func TestV3AuthWatchAndTokenExpire(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1, AuthTokenTTL: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
|
||||
|
||||
c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
|
||||
if cerr != nil {
|
||||
t.Fatal(cerr)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
_, err := c.Put(ctx, "key", "val")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from Put: %v", err)
|
||||
}
|
||||
|
||||
// The first watch gets a valid auth token through watcher.newWatcherGrpcStream()
|
||||
// We should discard the first one by waiting TTL after the first watch.
|
||||
wChan := c.Watch(ctx, "key", clientv3.WithRev(1))
|
||||
watchResponse := <-wChan
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
wChan = c.Watch(ctx, "key", clientv3.WithRev(1))
|
||||
watchResponse = <-wChan
|
||||
testutil.AssertNil(t, watchResponse.Err())
|
||||
}
|
||||
|
||||
func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
users := []user{
|
||||
{
|
||||
name: "user1",
|
||||
password: "user1-123",
|
||||
role: "role1",
|
||||
key: "k1",
|
||||
end: "k2",
|
||||
},
|
||||
}
|
||||
|
||||
authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users)
|
||||
|
||||
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
|
||||
|
||||
c, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
|
||||
if cerr != nil {
|
||||
t.Fatal(cerr)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
watchStartCh, watchEndCh := make(chan interface{}), make(chan interface{})
|
||||
|
||||
go func() {
|
||||
wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
|
||||
watchStartCh <- struct{}{}
|
||||
watchResponse := <-wChan
|
||||
t.Logf("watch response from k1: %v", watchResponse)
|
||||
testutil.AssertTrue(t, len(watchResponse.Events) != 0)
|
||||
watchEndCh <- struct{}{}
|
||||
}()
|
||||
|
||||
// Chan for making sure that the above goroutine invokes Watch()
|
||||
// So the above Watch() can get watch ID = 0
|
||||
<-watchStartCh
|
||||
|
||||
wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
|
||||
watchResponse := <-wChan
|
||||
testutil.AssertNotNil(t, watchResponse.Err()) // permission denied
|
||||
|
||||
_, err := c.Put(ctx, "k1", "val")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from Put: %v", err)
|
||||
}
|
||||
|
||||
<-watchEndCh
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
||||
)
|
||||
|
||||
@ -395,8 +396,8 @@ func TestV3WatchWrongRange(t *testing.T) {
|
||||
if cresp.Canceled != tt.canceled {
|
||||
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
|
||||
}
|
||||
if tt.canceled && cresp.WatchId != -1 {
|
||||
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
|
||||
if tt.canceled && cresp.WatchId != clientv3.InvalidWatchID {
|
||||
t.Fatalf("#%d: canceled watch ID %d, want %d", i, cresp.WatchId, clientv3.InvalidWatchID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user