clientv3/integration: add more tests on balancer switch, inflight range
Test all possible cases of server shutdown with inflight range requests. Removed redundant tests in kv_test.go. Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
@ -825,53 +825,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other
|
|
||||||
// endpoints are down, then it will reconnect.
|
|
||||||
func TestKVGetResetLoneEndpoint(t *testing.T) {
|
|
||||||
defer testutil.AfterTest(t)
|
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true})
|
|
||||||
defer clus.Terminate(t)
|
|
||||||
|
|
||||||
// get endpoint list
|
|
||||||
eps := make([]string, 2)
|
|
||||||
for i := range eps {
|
|
||||||
eps[i] = clus.Members[i].GRPCAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond}
|
|
||||||
cli, err := clientv3.New(cfg)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer cli.Close()
|
|
||||||
|
|
||||||
// disconnect everything
|
|
||||||
clus.Members[0].Stop(t)
|
|
||||||
clus.Members[1].Stop(t)
|
|
||||||
|
|
||||||
// have Get try to reconnect
|
|
||||||
donec := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
// 3-second is the minimum interval between endpoint being marked
|
|
||||||
// as unhealthy and being removed from unhealthy, so possibly
|
|
||||||
// takes >5-second to unpin and repin an endpoint
|
|
||||||
// TODO: decrease timeout when balancer switch rewrite
|
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second)
|
|
||||||
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
close(donec)
|
|
||||||
}()
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
clus.Members[0].Restart(t)
|
|
||||||
select {
|
|
||||||
case <-time.After(10 * time.Second):
|
|
||||||
t.Fatalf("timed out waiting for Get")
|
|
||||||
case <-donec:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestKVPutAtMostOnce ensures that a Put will only occur at most once
|
// TestKVPutAtMostOnce ensures that a Put will only occur at most once
|
||||||
// in the presence of network errors.
|
// in the presence of network errors.
|
||||||
func TestKVPutAtMostOnce(t *testing.T) {
|
func TestKVPutAtMostOnce(t *testing.T) {
|
||||||
|
@ -240,6 +240,120 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) {
|
||||||
|
tt := []pinTestOpt{
|
||||||
|
{pinLeader: true, stopPinFirst: true},
|
||||||
|
{pinLeader: true, stopPinFirst: false},
|
||||||
|
{pinLeader: false, stopPinFirst: true},
|
||||||
|
{pinLeader: false, stopPinFirst: false},
|
||||||
|
}
|
||||||
|
for i := range tt {
|
||||||
|
testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) {
|
||||||
|
tt := []pinTestOpt{
|
||||||
|
{pinLeader: true, stopPinFirst: true},
|
||||||
|
{pinLeader: true, stopPinFirst: false},
|
||||||
|
{pinLeader: false, stopPinFirst: true},
|
||||||
|
{pinLeader: false, stopPinFirst: false},
|
||||||
|
}
|
||||||
|
for i := range tt {
|
||||||
|
testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type pinTestOpt struct {
|
||||||
|
pinLeader bool
|
||||||
|
stopPinFirst bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// testBalancerUnderServerStopInflightRangeOnRestart expects
|
||||||
|
// inflight range request reconnects on server restart.
|
||||||
|
func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
cfg := &integration.ClusterConfig{
|
||||||
|
Size: 2,
|
||||||
|
SkipCreatingClient: true,
|
||||||
|
}
|
||||||
|
if linearizable {
|
||||||
|
cfg.Size = 3
|
||||||
|
}
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, cfg)
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
|
||||||
|
if linearizable {
|
||||||
|
eps = append(eps, clus.Members[2].GRPCAddr())
|
||||||
|
}
|
||||||
|
|
||||||
|
lead := clus.WaitLeader(t)
|
||||||
|
|
||||||
|
target := lead
|
||||||
|
if !opt.pinLeader {
|
||||||
|
target = (target + 1) % 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// pin eps[target]
|
||||||
|
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create client: %v", err)
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
// wait for eps[target] to be pinned
|
||||||
|
mustWaitPinReady(t, cli)
|
||||||
|
|
||||||
|
// add all eps to list, so that when the original pined one fails
|
||||||
|
// the client can switch to other available eps
|
||||||
|
cli.SetEndpoints(eps...)
|
||||||
|
|
||||||
|
if opt.stopPinFirst {
|
||||||
|
clus.Members[target].Stop(t)
|
||||||
|
// give some time for balancer switch before stopping the other
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
clus.Members[(target+1)%2].Stop(t)
|
||||||
|
} else {
|
||||||
|
clus.Members[(target+1)%2].Stop(t)
|
||||||
|
// balancer cannot pin other member since it's already stopped
|
||||||
|
clus.Members[target].Stop(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3-second is the minimum interval between endpoint being marked
|
||||||
|
// as unhealthy and being removed from unhealthy, so possibly
|
||||||
|
// takes >5-second to unpin and repin an endpoint
|
||||||
|
// TODO: decrease timeout when balancer switch rewrite
|
||||||
|
clientTimeout := 7 * time.Second
|
||||||
|
|
||||||
|
var gops []clientv3.OpOption
|
||||||
|
if !linearizable {
|
||||||
|
gops = append(gops, clientv3.WithSerializable())
|
||||||
|
}
|
||||||
|
|
||||||
|
donec, readyc := make(chan struct{}), make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
|
||||||
|
readyc <- struct{}{}
|
||||||
|
_, err := cli.Get(ctx, "abc", gops...)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-readyc
|
||||||
|
clus.Members[target].Restart(t)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(clientTimeout + 3*time.Second):
|
||||||
|
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
|
||||||
|
case <-donec:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// e.g. due to clock drifts in server-side,
|
// e.g. due to clock drifts in server-side,
|
||||||
// client context times out first in server-side
|
// client context times out first in server-side
|
||||||
// while original client-side context is not timed out yet
|
// while original client-side context is not timed out yet
|
||||||
|
Reference in New Issue
Block a user