clientv3: fix client balancer with gRPC v1.7

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee
2017-11-16 05:53:42 -08:00
parent 939337f450
commit a8c84ffc93
59 changed files with 2355 additions and 964 deletions

View File

@ -16,7 +16,6 @@ package integration
import (
"bytes"
"math/rand"
"os"
"reflect"
"strings"
@ -28,6 +27,7 @@ import (
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -441,8 +441,8 @@ func TestKVGetErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if err != nil && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()
@ -472,8 +472,9 @@ func TestKVNewAfterClose(t *testing.T) {
donec := make(chan struct{})
go func() {
if _, err := cli.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
_, err := cli.Get(context.TODO(), "foo")
if err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
@ -790,7 +791,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
}
@ -812,86 +813,51 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
}
// TestKVGetOneEndpointDown ensures a client can connect and get if one endpoint is down
func TestKVPutOneEndpointDown(t *testing.T) {
// TestKVPutAtMostOnce ensures that a Put will only occur at most once
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
// get endpoint list
eps := make([]string, 3)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}
// make a dead node
clus.Members[rand.Intn(len(eps))].Stop(t)
// try to connect with dead node in the endpoint list
cfg := clientv3.Config{Endpoints: eps, DialTimeout: 1 * time.Second}
cli, err := clientv3.New(cfg)
if err != nil {
if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {
t.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}
cancel()
}
// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other
// endpoints are down, then it will reconnect.
func TestKVGetResetLoneEndpoint(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)
// get endpoint list
eps := make([]string, 2)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}
cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
// disconnect everything
clus.Members[0].Stop(t)
clus.Members[1].Stop(t)
// have Get try to reconnect
donec := make(chan struct{})
go func() {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
donec := make(chan struct{})
go func() {
defer close(donec)
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
time.Sleep(5 * time.Millisecond)
}
}()
_, err := clus.Client(0).Put(context.TODO(), "k", "v")
<-donec
if err != nil {
break
}
cancel()
close(donec)
}()
time.Sleep(500 * time.Millisecond)
clus.Members[0].Restart(t)
select {
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for Get")
case <-donec:
}
resp, err := clus.Client(0).Get(context.TODO(), "k")
if err != nil {
t.Fatal(err)
}
if resp.Kvs[0].Version > 11 {
t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
}
}