integration: use client to do requests
This commit is contained in:
@ -23,21 +23,23 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/client"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tickDuration = 10 * time.Millisecond
|
tickDuration = 10 * time.Millisecond
|
||||||
clusterName = "etcd"
|
clusterName = "etcd"
|
||||||
|
requestTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -52,41 +54,27 @@ func testCluster(t *testing.T, size int) {
|
|||||||
defer afterTest(t)
|
defer afterTest(t)
|
||||||
c := &cluster{Size: size}
|
c := &cluster{Size: size}
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
|
defer c.Terminate(t)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
for _, u := range c.Members[i].ClientURLs {
|
for j, u := range c.Members[i].ClientURLs {
|
||||||
|
cc := mustNewHTTPClient(t, []string{u.String()})
|
||||||
|
kapi := client.NewKeysAPI(cc)
|
||||||
|
// TODO: we retry it here because MsgProp may be dropped due to
|
||||||
|
// sender reaches its max serving. make it reliable that we don't
|
||||||
|
// need to worry about it.
|
||||||
var err error
|
var err error
|
||||||
for j := 0; j < 3; j++ {
|
for k := 0; k < 3; k++ {
|
||||||
if err = setKey(u, "/foo", "bar"); err == nil {
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
|
if _, err = kapi.Create(ctx, fmt.Sprintf("/%d%d%d", i, j, k), "bar", -1); err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
cancel()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("setKey on %v error: %v", u.String(), err)
|
t.Errorf("create on %s error: %v", u.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.Terminate(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: use etcd client
|
|
||||||
func setKey(u url.URL, key string, value string) error {
|
|
||||||
u.Path = "/v2/keys" + key
|
|
||||||
v := url.Values{"value": []string{value}}
|
|
||||||
req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode()))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ioutil.ReadAll(resp.Body)
|
|
||||||
resp.Body.Close()
|
|
||||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
|
||||||
return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type cluster struct {
|
type cluster struct {
|
||||||
@ -130,13 +118,7 @@ func (c *cluster) Launch(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
m.NewCluster = true
|
m.NewCluster = true
|
||||||
m.Transport, err = transport.NewTransport(transport.TLSInfo{})
|
m.Transport = newTransport()
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
// TODO: need the support of graceful stop in Sender to remove this
|
|
||||||
m.Transport.DisableKeepAlives = true
|
|
||||||
m.Transport.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
|
||||||
|
|
||||||
m.Launch(t)
|
m.Launch(t)
|
||||||
c.Members = append(c.Members, m)
|
c.Members = append(c.Members, m)
|
||||||
@ -223,3 +205,19 @@ func (m *member) Terminate(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
|
||||||
|
cc, err := client.NewHTTPClient(newTransport(), eps)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return cc
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTransport() *http.Transport {
|
||||||
|
tr := &http.Transport{}
|
||||||
|
// TODO: need the support of graceful stop in Sender to remove this
|
||||||
|
tr.DisableKeepAlives = true
|
||||||
|
tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
||||||
|
return tr
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user