client: break apart KeysAPI from httpClient
This commit is contained in:
259
client/http.go
259
client/http.go
@ -17,22 +17,16 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultV2KeysPrefix = "/v2/keys"
|
||||
ErrTimeout = context.DeadlineExceeded
|
||||
ErrTimeout = context.DeadlineExceeded
|
||||
)
|
||||
|
||||
// transport mimics http.Transport to provide an interface which can be
|
||||
@ -43,75 +37,8 @@ type transport interface {
|
||||
CancelRequest(req *http.Request)
|
||||
}
|
||||
|
||||
type httpClient struct {
|
||||
transport transport
|
||||
endpoint url.URL
|
||||
timeout time.Duration
|
||||
v2KeysPrefix string
|
||||
}
|
||||
|
||||
func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpClient, error) {
|
||||
u, err := url.Parse(ep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &httpClient{
|
||||
transport: tr,
|
||||
endpoint: *u,
|
||||
timeout: timeout,
|
||||
v2KeysPrefix: DefaultV2KeysPrefix,
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *httpClient) SetPrefix(p string) {
|
||||
c.v2KeysPrefix = p
|
||||
}
|
||||
|
||||
func (c *httpClient) Endpoint() url.URL {
|
||||
ep := c.endpoint
|
||||
ep.Path = path.Join(ep.Path, c.v2KeysPrefix)
|
||||
return ep
|
||||
}
|
||||
|
||||
func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
|
||||
create := &createAction{
|
||||
Key: key,
|
||||
Value: val,
|
||||
}
|
||||
if ttl >= 0 {
|
||||
uttl := uint64(ttl.Seconds())
|
||||
create.TTL = &uttl
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
httpresp, body, err := c.do(ctx, create)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return unmarshalHTTPResponse(httpresp.StatusCode, body)
|
||||
}
|
||||
|
||||
func (c *httpClient) Get(key string) (*Response, error) {
|
||||
get := &getAction{
|
||||
Key: key,
|
||||
Recursive: false,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
httpresp, body, err := c.do(ctx, get)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return unmarshalHTTPResponse(httpresp.StatusCode, body)
|
||||
type httpAction interface {
|
||||
httpRequest(url.URL) *http.Request
|
||||
}
|
||||
|
||||
type roundTripResponse struct {
|
||||
@ -119,8 +46,35 @@ type roundTripResponse struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type httpClient struct {
|
||||
transport transport
|
||||
endpoint url.URL
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func newHTTPClient(tr *http.Transport, ep string, to time.Duration) (*httpClient, error) {
|
||||
u, err := url.Parse(ep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &httpClient{
|
||||
transport: tr,
|
||||
endpoint: *u,
|
||||
timeout: to,
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *httpClient) doWithTimeout(act httpAction) (*http.Response, []byte, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
defer cancel()
|
||||
return c.do(ctx, act)
|
||||
}
|
||||
|
||||
func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
|
||||
req := act.httpRequest(c.Endpoint())
|
||||
req := act.httpRequest(c.endpoint)
|
||||
|
||||
rtchan := make(chan roundTripResponse, 1)
|
||||
go func() {
|
||||
@ -157,154 +111,3 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
return resp, body, err
|
||||
}
|
||||
|
||||
func (c *httpClient) Watch(key string, idx uint64) Watcher {
|
||||
return &httpWatcher{
|
||||
httpClient: *c,
|
||||
nextWait: waitAction{
|
||||
Key: key,
|
||||
WaitIndex: idx,
|
||||
Recursive: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
|
||||
return &httpWatcher{
|
||||
httpClient: *c,
|
||||
nextWait: waitAction{
|
||||
Key: key,
|
||||
WaitIndex: idx,
|
||||
Recursive: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type httpWatcher struct {
|
||||
httpClient
|
||||
nextWait waitAction
|
||||
}
|
||||
|
||||
func (hw *httpWatcher) Next() (*Response, error) {
|
||||
httpresp, body, err := hw.httpClient.do(context.Background(), &hw.nextWait)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// v2KeysURL forms a URL representing the location of a key. The provided
|
||||
// endpoint must be the root of the etcd keys API. For example, a valid
|
||||
// endpoint probably has the path "/v2/keys".
|
||||
func v2KeysURL(ep url.URL, key string) *url.URL {
|
||||
ep.Path = path.Join(ep.Path, key)
|
||||
return &ep
|
||||
}
|
||||
|
||||
type httpAction interface {
|
||||
httpRequest(url.URL) *http.Request
|
||||
}
|
||||
|
||||
type getAction struct {
|
||||
Key string
|
||||
Recursive bool
|
||||
}
|
||||
|
||||
func (g *getAction) httpRequest(ep url.URL) *http.Request {
|
||||
u := v2KeysURL(ep, g.Key)
|
||||
|
||||
params := u.Query()
|
||||
params.Set("recursive", strconv.FormatBool(g.Recursive))
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
req, _ := http.NewRequest("GET", u.String(), nil)
|
||||
return req
|
||||
}
|
||||
|
||||
type waitAction struct {
|
||||
Key string
|
||||
WaitIndex uint64
|
||||
Recursive bool
|
||||
}
|
||||
|
||||
func (w *waitAction) httpRequest(ep url.URL) *http.Request {
|
||||
u := v2KeysURL(ep, w.Key)
|
||||
|
||||
params := u.Query()
|
||||
params.Set("wait", "true")
|
||||
params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
|
||||
params.Set("recursive", strconv.FormatBool(w.Recursive))
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
req, _ := http.NewRequest("GET", u.String(), nil)
|
||||
return req
|
||||
}
|
||||
|
||||
type createAction struct {
|
||||
Key string
|
||||
Value string
|
||||
TTL *uint64
|
||||
}
|
||||
|
||||
func (c *createAction) httpRequest(ep url.URL) *http.Request {
|
||||
u := v2KeysURL(ep, c.Key)
|
||||
|
||||
params := u.Query()
|
||||
params.Set("prevExist", "false")
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
form := url.Values{}
|
||||
form.Add("value", c.Value)
|
||||
if c.TTL != nil {
|
||||
form.Add("ttl", strconv.FormatUint(*c.TTL, 10))
|
||||
}
|
||||
body := strings.NewReader(form.Encode())
|
||||
|
||||
req, _ := http.NewRequest("PUT", u.String(), body)
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
|
||||
return req
|
||||
}
|
||||
|
||||
func unmarshalHTTPResponse(code int, body []byte) (res *Response, err error) {
|
||||
switch code {
|
||||
case http.StatusOK, http.StatusCreated:
|
||||
res, err = unmarshalSuccessfulResponse(body)
|
||||
default:
|
||||
err = unmarshalErrorResponse(code)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func unmarshalSuccessfulResponse(body []byte) (*Response, error) {
|
||||
var res Response
|
||||
err := json.Unmarshal(body, &res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func unmarshalErrorResponse(code int) error {
|
||||
switch code {
|
||||
case http.StatusNotFound:
|
||||
return ErrKeyNoExist
|
||||
case http.StatusPreconditionFailed:
|
||||
return ErrKeyExists
|
||||
case http.StatusInternalServerError:
|
||||
// this isn't necessarily true
|
||||
return ErrNoLeader
|
||||
default:
|
||||
}
|
||||
|
||||
return fmt.Errorf("unrecognized HTTP status code %d", code)
|
||||
}
|
||||
|
Reference in New Issue
Block a user