Merge pull request #14656 from ahrtr/test_dynamical_add_member
test: added e2e test case for issue 14571: etcd doesn't load auth info when recovering from a snapshot
This commit is contained in:
commit
0a0f0e3617
@ -17,11 +17,15 @@ package e2e
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
@ -72,6 +76,10 @@ func TestCtlV3AuthJWTExpire(t *testing.T) { testCtl(t, authTestJWTExpi
|
||||
func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) }
|
||||
func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) }
|
||||
|
||||
func TestCtlV3AuthRecoverFromSnapshot(t *testing.T) {
|
||||
testCtl(t, authTestRecoverSnapshot, withCfg(*newConfigNoTLS()), withQuorum(), withSnapshotCount(5))
|
||||
}
|
||||
|
||||
func authEnableTest(cx ctlCtx) {
|
||||
if err := authEnable(cx); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
@ -1289,3 +1297,192 @@ func authTestCacheReload(cx ctlCtx) {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that etcd works after recovering from a snapshot.
|
||||
// Refer to https://github.com/etcd-io/etcd/issues/14571.
|
||||
func authTestRecoverSnapshot(cx ctlCtx) {
|
||||
roles := []authRole{
|
||||
{
|
||||
role: "role0",
|
||||
permission: clientv3.PermissionType(clientv3.PermReadWrite),
|
||||
key: "foo",
|
||||
},
|
||||
}
|
||||
|
||||
users := []authUser{
|
||||
{
|
||||
user: "root",
|
||||
pass: "rootPass",
|
||||
role: "root",
|
||||
},
|
||||
{
|
||||
user: "user0",
|
||||
pass: "user0Pass",
|
||||
role: "role0",
|
||||
},
|
||||
}
|
||||
|
||||
cx.t.Log("setup and enable auth")
|
||||
setupAuth(cx, roles, users)
|
||||
|
||||
// create a client with root user
|
||||
cx.t.Log("create a client with root user")
|
||||
cliRoot, err := clientv3.New(clientv3.Config{Endpoints: cx.epc.EndpointsV3(), Username: "root", Password: "rootPass", DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer cliRoot.Close()
|
||||
|
||||
// write more than SnapshotCount keys, so that at least one snapshot is created
|
||||
cx.t.Log("Write enough key/value to trigger a snapshot")
|
||||
for i := 0; i <= 6; i++ {
|
||||
if _, err := cliRoot.Put(context.TODO(), fmt.Sprintf("key_%d", i), fmt.Sprintf("value_%d", i)); err != nil {
|
||||
cx.t.Fatalf("failed to Put (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// add a new member into the cluster
|
||||
// Refer to https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/tests/e2e/cluster_test.go#L238
|
||||
var (
|
||||
idx = 3
|
||||
name = fmt.Sprintf("test-%d", idx)
|
||||
port = cx.cfg.basePort + 5*idx
|
||||
curlHost = fmt.Sprintf("localhost:%d", port)
|
||||
nodeClientURL = url.URL{Scheme: cx.cfg.clientScheme(), Host: curlHost}
|
||||
nodePeerURL = url.URL{Scheme: cx.cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
|
||||
initialCluster = cx.epc.procs[0].Config().initialCluster + "," + fmt.Sprintf("%s=%s", name, nodePeerURL.String())
|
||||
)
|
||||
cx.t.Logf("Adding a new member: %s", nodePeerURL.String())
|
||||
// Must wait at least 5 seconds, otherwise it will always get an
|
||||
// "etcdserver: unhealthy cluster" response, please refer to link below,
|
||||
// https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/server/etcdserver/server.go#L1611
|
||||
assert.Eventually(cx.t, func() bool {
|
||||
if _, err := cliRoot.MemberAdd(context.TODO(), []string{nodePeerURL.String()}); err != nil {
|
||||
cx.t.Logf("Failed to add member, peelURL: %s, error: %v", nodePeerURL.String(), err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}, 8*time.Second, 2*time.Second)
|
||||
|
||||
cx.t.Logf("Starting the new member: %s", nodePeerURL.String())
|
||||
newProc, err := runEtcdNode(name, cx.t.TempDir(), nodeClientURL.String(), nodePeerURL.String(), "existing", initialCluster)
|
||||
require.NoError(cx.t, err)
|
||||
defer newProc.Stop()
|
||||
|
||||
// create a client with user "user0", and connects to the new member
|
||||
cx.t.Log("create a client with user 'user0'")
|
||||
cliUser, err := clientv3.New(clientv3.Config{Endpoints: []string{nodeClientURL.String()}, Username: "user0", Password: "user0Pass", DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer cliUser.Close()
|
||||
|
||||
// write data using the cliUser, expect no error
|
||||
cx.t.Log("Write a key/value using user 'user0'")
|
||||
_, err = cliUser.Put(context.TODO(), "foo", "bar")
|
||||
require.NoError(cx.t, err)
|
||||
|
||||
//verify all nodes have the same revision and hash
|
||||
var endpoints []string
|
||||
for _, proc := range cx.epc.procs {
|
||||
endpoints = append(endpoints, proc.Config().acurl)
|
||||
}
|
||||
endpoints = append(endpoints, nodeClientURL.String())
|
||||
cx.t.Log("Verify all members have the same revision and hash")
|
||||
assert.Eventually(cx.t, func() bool {
|
||||
hashKvs, err := hashKVs(endpoints, cliRoot)
|
||||
if err != nil {
|
||||
cx.t.Logf("failed to get HashKV: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if len(hashKvs) != 4 {
|
||||
cx.t.Logf("expected 4 hashkv responses, but got: %d", len(hashKvs))
|
||||
return false
|
||||
}
|
||||
|
||||
if !(hashKvs[0].Header.Revision == hashKvs[1].Header.Revision &&
|
||||
hashKvs[0].Header.Revision == hashKvs[2].Header.Revision &&
|
||||
hashKvs[0].Header.Revision == hashKvs[3].Header.Revision) {
|
||||
cx.t.Logf("Got different revisions, [%d, %d, %d, %d]",
|
||||
hashKvs[0].Header.Revision,
|
||||
hashKvs[1].Header.Revision,
|
||||
hashKvs[2].Header.Revision,
|
||||
hashKvs[3].Header.Revision)
|
||||
return false
|
||||
}
|
||||
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[1].Hash)
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[2].Hash)
|
||||
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[3].Hash)
|
||||
|
||||
return true
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
type authRole struct {
|
||||
role string
|
||||
permission clientv3.PermissionType
|
||||
key string
|
||||
keyEnd string
|
||||
}
|
||||
|
||||
type authUser struct {
|
||||
user string
|
||||
pass string
|
||||
role string
|
||||
}
|
||||
|
||||
func setupAuth(cx ctlCtx, roles []authRole, users []authUser) {
|
||||
endpoint := cx.epc.procs[0].EndpointsV3()[0]
|
||||
|
||||
// create a client
|
||||
c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// create roles
|
||||
for _, r := range roles {
|
||||
// add role
|
||||
if _, err = c.RoleAdd(context.TODO(), r.role); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
// grant permission to role
|
||||
if _, err = c.RoleGrantPermission(context.TODO(), r.role, r.key, r.keyEnd, r.permission); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// create users
|
||||
for _, u := range users {
|
||||
// add user
|
||||
if _, err = c.UserAdd(context.TODO(), u.user, u.pass); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
// grant role to user
|
||||
if _, err = c.UserGrantRole(context.TODO(), u.user, u.role); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// enable auth
|
||||
if _, err = c.AuthEnable(context.TODO()); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func hashKVs(endpoints []string, cli *clientv3.Client) ([]*clientv3.HashKVResponse, error) {
|
||||
var retHashKVs []*clientv3.HashKVResponse
|
||||
for _, ep := range endpoints {
|
||||
resp, err := cli.HashKV(context.TODO(), ep, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retHashKVs = append(retHashKVs, resp)
|
||||
}
|
||||
return retHashKVs, nil
|
||||
}
|
||||
|
@ -221,6 +221,14 @@ func withMaxConcurrentStreams(streams uint32) ctlOption {
|
||||
}
|
||||
}
|
||||
|
||||
// This function must be called after the `withCfg`, otherwise its value
|
||||
// may be overwritten by `withCfg`.
|
||||
func withSnapshotCount(snapshotCount int) ctlOption {
|
||||
return func(cx *ctlCtx) {
|
||||
cx.cfg.snapshotCount = snapshotCount
|
||||
}
|
||||
}
|
||||
|
||||
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
||||
testCtlWithOffline(t, testFunc, nil, opts...)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user