*: Update cases related to Downgrade
1. Update DowngradeUpgradeMembersByID If it's downgrading process, the desire version of cluster should be target one. If it's upgrading process, the desire version of cluster should be determined by mininum binary version of members. 2. Remove AssertProcessLogs from DowngradeEnable The log message "The server is ready to downgrade" appears only when the storage version monitor detects a mismatch between the cluster and storage versions. If traffic is insufficient to trigger a commit or if an auto-commit occurs right after reading the storage version, the monitor may fail to update it, leading to errors like: ```bash "msg":"failed to update storage version","cluster-version":"3.6.0", "error":"cannot detect storage schema version: missing confstate information" ``` Given this, we should remove the AssertProcessLogs statement. Similar to #19313 Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
091b6ed718
commit
65159a2b96
@ -25,6 +25,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
@ -51,6 +52,10 @@ func TestDowngradeUpgradeClusterOf1(t *testing.T) {
|
||||
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
|
||||
}
|
||||
|
||||
func TestDowngradeUpgrade2InClusterOf3(t *testing.T) {
|
||||
testDowngradeUpgrade(t, 2, 3, false, noCancellation)
|
||||
}
|
||||
|
||||
func TestDowngradeUpgradeClusterOf3(t *testing.T) {
|
||||
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
|
||||
}
|
||||
@ -128,6 +133,9 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
|
||||
time.Sleep(etcdserver.HealthInterval)
|
||||
}
|
||||
|
||||
t.Log("Downgrade should be disabled")
|
||||
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})
|
||||
|
||||
t.Log("Adding member to test membership, but a learner avoid breaking quorum")
|
||||
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
|
||||
require.NoError(t, err)
|
||||
@ -150,6 +158,10 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
|
||||
return // No need to perform downgrading, end the test here
|
||||
}
|
||||
e2e.DowngradeEnable(t, epc, lastVersion)
|
||||
|
||||
t.Log("Downgrade should be enabled")
|
||||
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})
|
||||
|
||||
if triggerCancellation == cancelRightAfterEnable {
|
||||
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
|
||||
e2e.DowngradeCancel(t, epc)
|
||||
@ -165,7 +177,7 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
|
||||
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
|
||||
require.NoError(t, err)
|
||||
if len(membersToChange) == len(epc.Procs) {
|
||||
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
|
||||
e2e.AssertProcessLogs(t, epc.Procs[epc.WaitLeader(t)], "the cluster has been downgraded")
|
||||
}
|
||||
|
||||
t.Log("Downgrade complete")
|
||||
@ -202,6 +214,14 @@ func testDowngradeUpgrade(t *testing.T, numberOfMembersToDowngrade int, clusterS
|
||||
require.NoError(t, err)
|
||||
t.Log("Upgrade complete")
|
||||
|
||||
if triggerCancellation == noCancellation && numberOfMembersToDowngrade < clusterSize {
|
||||
t.Log("Downgrade should be still enabled")
|
||||
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: true, TargetVersion: lastClusterVersion.String()})
|
||||
} else {
|
||||
t.Log("Downgrade should be disabled")
|
||||
e2e.ValidateDowngradeInfo(t, epc, &pb.DowngradeInfo{Enabled: false})
|
||||
}
|
||||
|
||||
afterMembers, afterKV = getMembersAndKeys(t, cc)
|
||||
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
|
||||
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
|
||||
@ -224,27 +244,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
|
||||
return epc
|
||||
}
|
||||
|
||||
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
for i := 0; i < len(epc.Procs); i++ {
|
||||
endpoints := epc.Procs[i].EndpointsGRPC()
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: 3 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer cli.Close()
|
||||
resp, err := cli.Status(ctx, endpoints[0])
|
||||
require.NoError(t, err)
|
||||
if resp.Header.GetMemberId() == resp.Leader {
|
||||
return epc.Procs[i]
|
||||
}
|
||||
}
|
||||
t.Fatal("Leader not found")
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
)
|
||||
@ -46,7 +47,6 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
|
||||
Server: OffsetMinor(ver, 1).String(),
|
||||
Storage: ver.String(),
|
||||
})
|
||||
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
|
||||
}
|
||||
|
||||
t.Log("Cluster is ready for downgrade")
|
||||
@ -82,6 +82,51 @@ func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster) {
|
||||
t.Log("Cluster downgrade cancellation is completed")
|
||||
}
|
||||
|
||||
func ValidateDowngradeInfo(t *testing.T, clus *EtcdProcessCluster, expected *pb.DowngradeInfo) {
|
||||
cfg := clus.Cfg
|
||||
|
||||
for i := 0; i < len(clus.Procs); i++ {
|
||||
member := clus.Procs[i]
|
||||
mc := member.Etcdctl()
|
||||
mName := member.Config().Name
|
||||
|
||||
testutils.ExecuteWithTimeout(t, 1*time.Minute, func() {
|
||||
for {
|
||||
statuses, err := mc.Status(context.Background())
|
||||
if err != nil {
|
||||
cfg.Logger.Warn("failed to get member status and retrying",
|
||||
zap.Error(err),
|
||||
zap.String("member", mName))
|
||||
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
require.Lenf(t, statuses, 1, "member %s", mName)
|
||||
got := (*pb.StatusResponse)(statuses[0]).GetDowngradeInfo()
|
||||
|
||||
if got.GetEnabled() == expected.GetEnabled() && got.GetTargetVersion() == expected.GetTargetVersion() {
|
||||
cfg.Logger.Info("DowngradeInfo match", zap.String("member", mName))
|
||||
break
|
||||
}
|
||||
|
||||
cfg.Logger.Warn("DowngradeInfo didn't match retrying",
|
||||
zap.String("member", mName),
|
||||
zap.Dict("expected",
|
||||
zap.Bool("Enabled", expected.GetEnabled()),
|
||||
zap.String("TargetVersion", expected.GetTargetVersion()),
|
||||
),
|
||||
zap.Dict("got",
|
||||
zap.Bool("Enabled", got.GetEnabled()),
|
||||
zap.String("TargetVersion", got.GetTargetVersion()),
|
||||
),
|
||||
)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
|
||||
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
|
||||
t.Logf("Elect members for operations on members: %v", membersToChange)
|
||||
@ -100,7 +145,6 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
|
||||
opString = "downgrading"
|
||||
newExecPath = BinPath.EtcdLastRelease
|
||||
}
|
||||
|
||||
for _, memberID := range membersToChange {
|
||||
member := clus.Procs[memberID]
|
||||
if member.Config().ExecPath == newExecPath {
|
||||
@ -117,11 +161,16 @@ func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcess
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
clusterVersion := targetVersion.String()
|
||||
if !isDowngrade && len(membersToChange) != len(clus.Procs) {
|
||||
clusterVersion = currentVersion.String()
|
||||
}
|
||||
lg.Info("Validating versions")
|
||||
for _, memberID := range membersToChange {
|
||||
member := clus.Procs[memberID]
|
||||
ValidateVersion(t, clus.Cfg, member, version.Versions{
|
||||
Cluster: targetVersion.String(),
|
||||
Cluster: clusterVersion,
|
||||
Server: targetVersion.String(),
|
||||
})
|
||||
}
|
||||
|
@ -232,6 +232,13 @@ func (f memberDowngradeUpgrade) Inject(ctx context.Context, t *testing.T, lg *za
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NOTE: By default, the leader can cancel the downgrade once all members
|
||||
// have reached the target version. However, determining the final stable
|
||||
// cluster version after an upgrade can be challenging. To ensure stability,
|
||||
// we should wait for leader to cancel downgrade process.
|
||||
e2e.AssertProcessLogs(t, clus.Procs[clus.WaitLeader(t)], "the cluster has been downgraded")
|
||||
|
||||
// partial upgrade the cluster
|
||||
numberOfMembersToUpgrade := rand.Int()%len(clus.Procs) + 1
|
||||
err = e2e.DowngradeUpgradeMembers(t, lg, clus, numberOfMembersToUpgrade, lastVersion, currentVersion)
|
||||
|
Loading…
Reference in New Issue
Block a user