clientv3/integration: test lease keepalive
This commit is contained in:
@ -16,6 +16,7 @@ package integration
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
@ -50,7 +51,7 @@ func TestLeaseCreate(t *testing.T) {
|
|||||||
func TestLeaseRevoke(t *testing.T) {
|
func TestLeaseRevoke(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
lapi := clientv3.NewLease(clus.RandClient())
|
lapi := clientv3.NewLease(clus.RandClient())
|
||||||
@ -73,3 +74,111 @@ func TestLeaseRevoke(t *testing.T) {
|
|||||||
t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound)
|
t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeaseKeepAliveOnce(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
lapi := clientv3.NewLease(clus.RandClient())
|
||||||
|
defer lapi.Close()
|
||||||
|
|
||||||
|
resp, err := lapi.Create(context.Background(), 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create lease %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = lapi.KeepAliveOnce(context.Background(), lease.LeaseID(resp.ID))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to keepalive lease", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaseKeepAlive(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
lapi := clientv3.NewLease(clus.RandClient())
|
||||||
|
|
||||||
|
resp, err := lapi.Create(context.Background(), 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create lease %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
|
||||||
|
if kerr != nil {
|
||||||
|
t.Errorf("failed to keepalive lease %v", kerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
kresp, ok := <-rc
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("chan is closed, want not closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if kresp.ID != resp.ID {
|
||||||
|
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
lapi.Close()
|
||||||
|
|
||||||
|
_, ok = <-rc
|
||||||
|
if ok {
|
||||||
|
t.Errorf("chan is not closed, want lease Close() closes chan")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add a client that can connect to all the members of cluster via unix sock.
|
||||||
|
// TODO: test handle more complicated failures.
|
||||||
|
func TestLeaseKeepAliveHandleFailure(t *testing.T) {
|
||||||
|
t.Skip("test it when we have a cluster client")
|
||||||
|
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
// TODO: change this line to get a cluster client
|
||||||
|
lapi := clientv3.NewLease(clus.RandClient())
|
||||||
|
|
||||||
|
resp, err := lapi.Create(context.Background(), 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to create lease %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
|
||||||
|
if kerr != nil {
|
||||||
|
t.Errorf("failed to keepalive lease %v", kerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
kresp := <-rc
|
||||||
|
if kresp.ID != resp.ID {
|
||||||
|
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// restart the connected member.
|
||||||
|
clus.Members[0].Stop(t)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-rc:
|
||||||
|
t.Fatalf("unexpected keepalive")
|
||||||
|
case <-time.After(10*time.Second/3 + 1):
|
||||||
|
}
|
||||||
|
|
||||||
|
// recover the member.
|
||||||
|
clus.Members[0].Restart(t)
|
||||||
|
|
||||||
|
kresp = <-rc
|
||||||
|
if kresp.ID != resp.ID {
|
||||||
|
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
lapi.Close()
|
||||||
|
|
||||||
|
_, ok := <-rc
|
||||||
|
if ok {
|
||||||
|
t.Errorf("chan is not closed, want lease Close() closes chan")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -206,12 +206,18 @@ func (l *lessor) recvKeepAliveLoop() {
|
|||||||
defer func() {
|
defer func() {
|
||||||
l.stopCancel()
|
l.stopCancel()
|
||||||
close(l.donec)
|
close(l.donec)
|
||||||
|
for _, ch := range l.keepAlives {
|
||||||
|
close(ch)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
stream, serr := l.resetRecv()
|
stream, serr := l.resetRecv()
|
||||||
for {
|
for {
|
||||||
resp, err := stream.Recv()
|
resp, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if isRPCError(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
if stream, serr = l.resetRecv(); serr != nil {
|
if stream, serr = l.resetRecv(); serr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user