integration: add client tls support

This commit is contained in:
Anthony Romano
2016-01-29 11:31:25 -08:00
parent 4634874d99
commit 60c037f1c3
4 changed files with 93 additions and 56 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
@ -55,11 +56,19 @@ var (
// integration test uses well-known ports to listen for each running member, // integration test uses well-known ports to listen for each running member,
// which ensures restarted member could listen on specific port again. // which ensures restarted member could listen on specific port again.
nextListenPort int64 = 20000 nextListenPort int64 = 20000
testTLSInfo = transport.TLSInfo{
KeyFile: "./fixtures/server.key.insecure",
CertFile: "./fixtures/server.crt",
TrustedCAFile: "./fixtures/ca.crt",
ClientCertAuth: true,
}
) )
type ClusterConfig struct { type ClusterConfig struct {
Size int Size int
UsePeerTLS bool PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string DiscoveryURL string
UseV3 bool UseV3 bool
UseGRPC bool UseGRPC bool
@ -79,7 +88,7 @@ func (c *cluster) fillClusterForMembers() error {
addrs := make([]string, 0) addrs := make([]string, 0)
for _, m := range c.Members { for _, m := range c.Members {
scheme := "http" scheme := "http"
if !m.PeerTLSInfo.Empty() { if m.PeerTLSInfo != nil {
scheme = "https" scheme = "https"
} }
for _, l := range m.PeerListeners { for _, l := range m.PeerListeners {
@ -158,16 +167,19 @@ func (c *cluster) URLs() []string {
func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) HTTPMembers() []client.Member {
ms := make([]client.Member, len(c.Members)) ms := make([]client.Member, len(c.Members))
for i, m := range c.Members { for i, m := range c.Members {
scheme := "http" pScheme, cScheme := "http", "http"
if !m.PeerTLSInfo.Empty() { if m.PeerTLSInfo != nil {
scheme = "https" pScheme = "https"
}
if m.ClientTLSInfo != nil {
cScheme = "https"
} }
ms[i].Name = m.Name ms[i].Name = m.Name
for _, ln := range m.PeerListeners { for _, ln := range m.PeerListeners {
ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String()) ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String())
} }
for _, ln := range m.ClientListeners { for _, ln := range m.ClientListeners {
ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String())
} }
} }
return ms return ms
@ -175,7 +187,7 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t *testing.T) *member { func (c *cluster) mustNewMember(t *testing.T) *member {
name := c.name(rand.Int()) name := c.name(rand.Int())
m := mustNewMember(t, name, c.cfg.UsePeerTLS) m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS)
m.DiscoveryURL = c.cfg.DiscoveryURL m.DiscoveryURL = c.cfg.DiscoveryURL
m.V3demo = c.cfg.UseV3 m.V3demo = c.cfg.UseV3
if c.cfg.UseGRPC { if c.cfg.UseGRPC {
@ -190,12 +202,12 @@ func (c *cluster) addMember(t *testing.T) {
m := c.mustNewMember(t) m := c.mustNewMember(t)
scheme := "http" scheme := "http"
if c.cfg.UsePeerTLS { if c.cfg.PeerTLS != nil {
scheme = "https" scheme = "https"
} }
// send add request to the cluster // send add request to the cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)}) cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
@ -228,7 +240,7 @@ func (c *cluster) AddMember(t *testing.T) {
func (c *cluster) RemoveMember(t *testing.T, id uint64) { func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster // send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs()) cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil { if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
@ -263,7 +275,7 @@ func (c *cluster) Terminate(t *testing.T) {
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() { for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u}) cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -355,8 +367,10 @@ type member struct {
etcdserver.ServerConfig etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener PeerListeners, ClientListeners []net.Listener
grpcListener net.Listener grpcListener net.Listener
// inited PeerTLSInfo implies to enable peer TLS // PeerTLSInfo enables peer TLS when set
PeerTLSInfo transport.TLSInfo PeerTLSInfo *transport.TLSInfo
// ClientTLSInfo enables client TLS when set
ClientTLSInfo *transport.TLSInfo
raftHandler *testutil.PauseableHandler raftHandler *testutil.PauseableHandler
s *etcdserver.EtcdServer s *etcdserver.EtcdServer
@ -366,25 +380,19 @@ type member struct {
grpcAddr string grpcAddr string
} }
// mustNewMember return an inited member with the given name. If usePeerTLS is // mustNewMember return an inited member with the given name. If peerTLS is
// true, it will set PeerTLSInfo and use https scheme to communicate between // set, it will use https scheme to communicate between peers.
// peers. func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, clientTLS *transport.TLSInfo) *member {
func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { var err error
var (
testTLSInfo = transport.TLSInfo{
KeyFile: "./fixtures/server.key.insecure",
CertFile: "./fixtures/server.crt",
TrustedCAFile: "./fixtures/ca.crt",
ClientCertAuth: true,
}
err error
)
m := &member{} m := &member{}
peerScheme := "http" peerScheme, clientScheme := "http", "http"
if usePeerTLS { if peerTLS != nil {
peerScheme = "https" peerScheme = "https"
} }
if clientTLS != nil {
clientScheme = "https"
}
pln := newLocalListener(t) pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln} m.PeerListeners = []net.Listener{pln}
@ -392,16 +400,15 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if usePeerTLS { m.PeerTLSInfo = peerTLS
m.PeerTLSInfo = testTLSInfo
}
cln := newLocalListener(t) cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln} m.ClientListeners = []net.Listener{cln}
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
m.ClientTLSInfo = clientTLS
m.Name = name m.Name = name
@ -416,7 +423,9 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
} }
m.InitialClusterToken = clusterName m.InitialClusterToken = clusterName
m.NewCluster = true m.NewCluster = true
m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo if m.PeerTLSInfo != nil {
m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
}
m.ElectionTicks = electionTicks m.ElectionTicks = electionTicks
m.TickMs = uint(tickDuration / time.Millisecond) m.TickMs = uint(tickDuration / time.Millisecond)
return m return m
@ -427,7 +436,8 @@ func (m *member) listenGRPC() error {
if m.V3demo == false { if m.V3demo == false {
return fmt.Errorf("starting grpc server without v3 configured") return fmt.Errorf("starting grpc server without v3 configured")
} }
m.grpcAddr = m.Name + ".sock" // prefix with localhost so cert has right domain
m.grpcAddr = "localhost:" + m.Name + ".sock"
if err := os.RemoveAll(m.grpcAddr); err != nil { if err := os.RemoveAll(m.grpcAddr); err != nil {
return err return err
} }
@ -448,7 +458,21 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
return net.Dial("unix", a) return net.Dial("unix", a)
} }
unixdialer := grpc.WithDialer(f) unixdialer := grpc.WithDialer(f)
conn, err := grpc.Dial(m.grpcAddr, grpc.WithInsecure(), unixdialer) opts := []grpc.DialOption{
unixdialer,
grpc.WithBlock(),
grpc.WithTimeout(5 * time.Second)}
if m.ClientTLSInfo != nil {
tlscfg, err := m.ClientTLSInfo.ClientConfig()
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlscfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(m.grpcAddr, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -483,6 +507,7 @@ func (m *member) Clone(t *testing.T) *member {
mm.InitialClusterToken = m.InitialClusterToken mm.InitialClusterToken = m.InitialClusterToken
mm.ElectionTicks = m.ElectionTicks mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo mm.PeerTLSInfo = m.PeerTLSInfo
mm.ClientTLSInfo = m.ClientTLSInfo
return mm return mm
} }
@ -503,7 +528,7 @@ func (m *member) Launch() error {
Listener: ln, Listener: ln,
Config: &http.Server{Handler: m.raftHandler}, Config: &http.Server{Handler: m.raftHandler},
} }
if m.PeerTLSInfo.Empty() { if m.PeerTLSInfo == nil {
hs.Start() hs.Start()
} else { } else {
hs.TLS, err = m.PeerTLSInfo.ServerConfig() hs.TLS, err = m.PeerTLSInfo.ServerConfig()
@ -519,18 +544,26 @@ func (m *member) Launch() error {
Listener: ln, Listener: ln,
Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
} }
hs.Start() if m.ClientTLSInfo == nil {
hs.Start()
} else {
hs.TLS, err = m.ClientTLSInfo.ServerConfig()
if err != nil {
return err
}
hs.StartTLS()
}
m.hss = append(m.hss, hs) m.hss = append(m.hss, hs)
} }
if m.grpcListener != nil { if m.grpcListener != nil {
m.grpcServer, err = v3rpc.Server(m.s, nil) m.grpcServer, err = v3rpc.Server(m.s, m.ClientTLSInfo)
go m.grpcServer.Serve(m.grpcListener) go m.grpcServer.Serve(m.grpcListener)
} }
return nil return nil
} }
func (m *member) WaitOK(t *testing.T) { func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()}) cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -608,8 +641,12 @@ func (m *member) Terminate(t *testing.T) {
} }
} }
func mustNewHTTPClient(t *testing.T, eps []string) client.Client { func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps} cfgtls := transport.TLSInfo{}
if tls != nil {
cfgtls = *tls
}
cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
c, err := client.New(cfg) c, err := client.New(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -51,7 +51,7 @@ func testCluster(t *testing.T, size int) {
func TestTLSClusterOf3(t *testing.T) { func TestTLSClusterOf3(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
c.Launch(t) c.Launch(t)
defer c.Terminate(t) defer c.Terminate(t)
clusterMustProgress(t, c.Members) clusterMustProgress(t, c.Members)
@ -66,7 +66,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs()) dcc := mustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
@ -89,7 +89,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs()) dcc := mustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
@ -100,7 +100,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
c := NewClusterByConfig(t, c := NewClusterByConfig(t,
&ClusterConfig{ &ClusterConfig{
Size: 3, Size: 3,
UsePeerTLS: true, PeerTLS: &testTLSInfo,
DiscoveryURL: dc.URL(0) + "/v2/keys"}, DiscoveryURL: dc.URL(0) + "/v2/keys"},
) )
c.Launch(t) c.Launch(t)
@ -125,7 +125,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
func TestDoubleTLSClusterSizeOf3(t *testing.T) { func TestDoubleTLSClusterSizeOf3(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &ClusterConfig{Size: 3, UsePeerTLS: true}) c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
c.Launch(t) c.Launch(t)
defer c.Terminate(t) defer c.Terminate(t)
@ -156,7 +156,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
func TestForceNewCluster(t *testing.T) { func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3) c := NewCluster(t, 3)
c.Launch(t) c.Launch(t)
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}) cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kapi.Create(ctx, "/foo", "bar") resp, err := kapi.Create(ctx, "/foo", "bar")
@ -183,7 +183,7 @@ func TestForceNewCluster(t *testing.T) {
c.waitLeader(t, c.Members[:1]) c.waitLeader(t, c.Members[:1])
// use new http client to init new connection // use new http client to init new connection
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}) cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi = client.NewKeysAPI(cc) kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress // ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
@ -267,7 +267,7 @@ func TestIssue2904(t *testing.T) {
c.Members[1].Stop(t) c.Members[1].Stop(t)
// send remove member-1 request to the cluster. // send remove member-1 request to the cluster.
cc := mustNewHTTPClient(t, c.URLs()) cc := mustNewHTTPClient(t, c.URLs(), nil)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
// the proposal is not committed because member 1 is stopped, but the // the proposal is not committed because member 1 is stopped, but the
@ -294,7 +294,7 @@ func TestIssue2904(t *testing.T) {
// a random key first, and check the new key could be got from all client urls // a random key first, and check the new key could be got from all client urls
// of the cluster. // of the cluster.
func clusterMustProgress(t *testing.T, membs []*member) { func clusterMustProgress(t *testing.T, membs []*member) {
cc := mustNewHTTPClient(t, []string{membs[0].URL()}) cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", rand.Int()) key := fmt.Sprintf("foo%d", rand.Int())
@ -306,7 +306,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
for i, m := range membs { for i, m := range membs {
u := m.URL() u := m.URL()
mcc := mustNewHTTPClient(t, []string{u}) mcc := mustNewHTTPClient(t, []string{u}, nil)
mkapi := client.NewKeysAPI(mcc) mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {

View File

@ -83,7 +83,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
func TestSnapshotAndRestartMember(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
m := mustNewMember(t, "snapAndRestartTest", false) m := mustNewMember(t, "snapAndRestartTest", nil, nil)
m.SnapCount = 100 m.SnapCount = 100
m.Launch() m.Launch()
defer m.Terminate(t) defer m.Terminate(t)
@ -92,7 +92,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
resps := make([]*client.Response, 120) resps := make([]*client.Response, 120)
var err error var err error
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}) cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)
@ -106,7 +106,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
m.Restart(t) m.Restart(t)
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}) cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)

View File

@ -23,7 +23,7 @@ import (
func TestUpgradeMember(t *testing.T) { func TestUpgradeMember(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
m := mustNewMember(t, "integration046", false) m := mustNewMember(t, "integration046", nil, nil)
cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir) cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
err := cmd.Run() err := cmd.Run()
if err != nil { if err != nil {