Compare commits
17 Commits
Author | SHA1 | Date | |
---|---|---|---|
bdee27b19e | |||
7f684641a3 | |||
d54cf26bed | |||
d3e73aadab | |||
e340928988 | |||
e6ffe22e16 | |||
05b564a394 | |||
cb779b2305 | |||
22c3208fb3 | |||
e44372e430 | |||
05a90bc1e5 | |||
6751727809 | |||
916106c3a2 | |||
e0c7768f94 | |||
0fb2d5d4d3 | |||
fc61fc7c7a | |||
09b81bad15 |
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -129,10 +129,6 @@
|
|||||||
"ImportPath": "golang.org/x/net/context",
|
"ImportPath": "golang.org/x/net/context",
|
||||||
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
|
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"ImportPath": "golang.org/x/net/netutil",
|
|
||||||
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"ImportPath": "golang.org/x/oauth2",
|
"ImportPath": "golang.org/x/oauth2",
|
||||||
"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"
|
"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"
|
||||||
|
103
Godeps/_workspace/src/golang.org/x/net/netutil/listen_test.go
generated
vendored
103
Godeps/_workspace/src/golang.org/x/net/netutil/listen_test.go
generated
vendored
@ -1,103 +0,0 @@
|
|||||||
// Copyright 2013 The Go Authors. All rights reserved.
|
|
||||||
// Use of this source code is governed by a BSD-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
// +build go1.3
|
|
||||||
|
|
||||||
// (We only run this test on Go 1.3 because the HTTP client timeout behavior
|
|
||||||
// was bad in previous releases, causing occasional deadlocks.)
|
|
||||||
|
|
||||||
package netutil
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLimitListener(t *testing.T) {
|
|
||||||
const (
|
|
||||||
max = 5
|
|
||||||
num = 200
|
|
||||||
)
|
|
||||||
|
|
||||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Listen: %v", err)
|
|
||||||
}
|
|
||||||
defer l.Close()
|
|
||||||
l = LimitListener(l, max)
|
|
||||||
|
|
||||||
var open int32
|
|
||||||
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if n := atomic.AddInt32(&open, 1); n > max {
|
|
||||||
t.Errorf("%d open connections, want <= %d", n, max)
|
|
||||||
}
|
|
||||||
defer atomic.AddInt32(&open, -1)
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
fmt.Fprint(w, "some body")
|
|
||||||
}))
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var failed int32
|
|
||||||
for i := 0; i < num; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
c := http.Client{Timeout: 3 * time.Second}
|
|
||||||
r, err := c.Get("http://" + l.Addr().String())
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("Get: %v", err)
|
|
||||||
atomic.AddInt32(&failed, 1)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer r.Body.Close()
|
|
||||||
io.Copy(ioutil.Discard, r.Body)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// We expect some Gets to fail as the kernel's accept queue is filled,
|
|
||||||
// but most should succeed.
|
|
||||||
if failed >= num/2 {
|
|
||||||
t.Errorf("too many Gets failed: %v", failed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type errorListener struct {
|
|
||||||
net.Listener
|
|
||||||
}
|
|
||||||
|
|
||||||
func (errorListener) Accept() (net.Conn, error) {
|
|
||||||
return nil, errFake
|
|
||||||
}
|
|
||||||
|
|
||||||
var errFake = errors.New("fake error from errorListener")
|
|
||||||
|
|
||||||
// This used to hang.
|
|
||||||
func TestLimitListenerError(t *testing.T) {
|
|
||||||
donec := make(chan bool, 1)
|
|
||||||
go func() {
|
|
||||||
const n = 2
|
|
||||||
ll := LimitListener(errorListener{}, n)
|
|
||||||
for i := 0; i < n+1; i++ {
|
|
||||||
_, err := ll.Accept()
|
|
||||||
if err != errFake {
|
|
||||||
t.Fatalf("Accept error = %v; want errFake", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
donec <- true
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-donec:
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Fatal("timeout. deadlock?")
|
|
||||||
}
|
|
||||||
}
|
|
@ -162,6 +162,11 @@ type Client interface {
|
|||||||
// this may differ from the initial Endpoints provided in the Config.
|
// this may differ from the initial Endpoints provided in the Config.
|
||||||
Endpoints() []string
|
Endpoints() []string
|
||||||
|
|
||||||
|
// SetEndpoints sets the set of API endpoints used by Client to resolve
|
||||||
|
// HTTP requests. If the given endpoints are not valid, an error will be
|
||||||
|
// returned
|
||||||
|
SetEndpoints(eps []string) error
|
||||||
|
|
||||||
httpClient
|
httpClient
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,7 +181,7 @@ func New(cfg Config) (Client, error) {
|
|||||||
password: cfg.Password,
|
password: cfg.Password,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := c.reset(cfg.Endpoints); err != nil {
|
if err := c.SetEndpoints(cfg.Endpoints); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
@ -219,7 +224,7 @@ type httpClusterClient struct {
|
|||||||
rand *rand.Rand
|
rand *rand.Rand
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClusterClient) reset(eps []string) error {
|
func (c *httpClusterClient) SetEndpoints(eps []string) error {
|
||||||
if len(eps) == 0 {
|
if len(eps) == 0 {
|
||||||
return ErrNoEndpoints
|
return ErrNoEndpoints
|
||||||
}
|
}
|
||||||
@ -341,7 +346,7 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.reset(eps)
|
return c.SetEndpoints(eps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
|
func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
|
||||||
@ -378,9 +383,12 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hctx, hcancel := context.WithCancel(ctx)
|
var hctx context.Context
|
||||||
|
var hcancel context.CancelFunc
|
||||||
if c.headerTimeout > 0 {
|
if c.headerTimeout > 0 {
|
||||||
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
|
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
|
||||||
|
} else {
|
||||||
|
hctx, hcancel = context.WithCancel(ctx)
|
||||||
}
|
}
|
||||||
defer hcancel()
|
defer hcancel()
|
||||||
|
|
||||||
|
@ -705,7 +705,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
|
|||||||
clientFactory: cf,
|
clientFactory: cf,
|
||||||
rand: rand.New(rand.NewSource(0)),
|
rand: rand.New(rand.NewSource(0)),
|
||||||
}
|
}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during setup: %#v", err)
|
t.Fatalf("unexpected error during setup: %#v", err)
|
||||||
}
|
}
|
||||||
@ -728,7 +728,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
|
|||||||
t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
|
t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = hc.reset([]string{"http://127.0.0.1:4009"})
|
err = hc.SetEndpoints([]string{"http://127.0.0.1:4009"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during reset: %#v", err)
|
t.Fatalf("unexpected error during reset: %#v", err)
|
||||||
}
|
}
|
||||||
@ -749,7 +749,7 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
|
|||||||
clientFactory: cf,
|
clientFactory: cf,
|
||||||
rand: rand.New(rand.NewSource(0)),
|
rand: rand.New(rand.NewSource(0)),
|
||||||
}
|
}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during setup: %#v", err)
|
t.Fatalf("unexpected error during setup: %#v", err)
|
||||||
}
|
}
|
||||||
@ -783,7 +783,7 @@ func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
|
|||||||
clientFactory: cf,
|
clientFactory: cf,
|
||||||
rand: rand.New(rand.NewSource(0)),
|
rand: rand.New(rand.NewSource(0)),
|
||||||
}
|
}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during setup: %#v", err)
|
t.Fatalf("unexpected error during setup: %#v", err)
|
||||||
}
|
}
|
||||||
@ -805,7 +805,7 @@ func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
|
|||||||
clientFactory: cf,
|
clientFactory: cf,
|
||||||
rand: rand.New(rand.NewSource(0)),
|
rand: rand.New(rand.NewSource(0)),
|
||||||
}
|
}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during setup: %#v", err)
|
t.Fatalf("unexpected error during setup: %#v", err)
|
||||||
}
|
}
|
||||||
@ -838,7 +838,7 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
|
|||||||
clientFactory: cf,
|
clientFactory: cf,
|
||||||
rand: rand.New(rand.NewSource(0)),
|
rand: rand.New(rand.NewSource(0)),
|
||||||
}
|
}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error during setup: %#v", err)
|
t.Fatalf("unexpected error during setup: %#v", err)
|
||||||
}
|
}
|
||||||
@ -867,7 +867,7 @@ func TestHTTPClusterClientResetFail(t *testing.T) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
|
hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
|
||||||
err := hc.reset(tt)
|
err := hc.SetEndpoints(tt)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("#%d: expected non-nil error", i)
|
t.Errorf("#%d: expected non-nil error", i)
|
||||||
}
|
}
|
||||||
@ -879,7 +879,7 @@ func TestHTTPClusterClientResetPinRandom(t *testing.T) {
|
|||||||
pinNum := 0
|
pinNum := 0
|
||||||
for i := 0; i < round; i++ {
|
for i := 0; i < round; i++ {
|
||||||
hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
|
hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
|
||||||
err := hc.reset([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
|
err := hc.SetEndpoints([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("#%d: reset error (%v)", i, err)
|
t.Fatalf("#%d: reset error (%v)", i, err)
|
||||||
}
|
}
|
||||||
|
@ -108,8 +108,13 @@ func handleClusterHealth(c *cli.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !forever {
|
if !forever {
|
||||||
break
|
if health {
|
||||||
|
os.Exit(ExitSuccess)
|
||||||
|
} else {
|
||||||
|
os.Exit(ExitClusterNotHealthy)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("\nnext check after 10 second...\n\n")
|
fmt.Printf("\nnext check after 10 second...\n\n")
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ const (
|
|||||||
ExitBadConnection
|
ExitBadConnection
|
||||||
ExitBadAuth
|
ExitBadAuth
|
||||||
ExitServerError
|
ExitServerError
|
||||||
|
ExitClusterNotHealthy
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleError(code int, err error) {
|
func handleError(code int, err error) {
|
||||||
|
@ -16,7 +16,6 @@ package command
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||||
@ -43,15 +42,9 @@ func updatedirCommandFunc(c *cli.Context, ki client.KeysAPI) {
|
|||||||
handleError(ExitBadArgs, errors.New("key required"))
|
handleError(ExitBadArgs, errors.New("key required"))
|
||||||
}
|
}
|
||||||
key := c.Args()[0]
|
key := c.Args()[0]
|
||||||
value, err := argOrStdin(c.Args(), os.Stdin, 1)
|
|
||||||
if err != nil {
|
|
||||||
handleError(ExitBadArgs, errors.New("value required"))
|
|
||||||
}
|
|
||||||
|
|
||||||
ttl := c.Int("ttl")
|
ttl := c.Int("ttl")
|
||||||
|
|
||||||
ctx, cancel := contextWithTotalTimeout(c)
|
ctx, cancel := contextWithTotalTimeout(c)
|
||||||
_, err = ki.Set(ctx, key, value, &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
|
_, err := ki.Set(ctx, key, "", &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(ExitServerError, err)
|
handleError(ExitServerError, err)
|
||||||
|
@ -203,9 +203,19 @@ func mustNewClient(c *cli.Context) client.Client {
|
|||||||
if err == client.ErrNoEndpoints {
|
if err == client.ErrNoEndpoints {
|
||||||
fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
|
fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
|
||||||
fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
|
fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
|
||||||
|
handleError(ExitServerError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// fail-back to try sync cluster with peer API. this is for making etcdctl work with etcd 0.4.x.
|
||||||
|
// TODO: remove this when we deprecate the support for etcd 0.4.
|
||||||
|
eps, serr := syncWithPeerAPI(c, ctx, hc.Endpoints())
|
||||||
|
if serr != nil {
|
||||||
|
handleError(ExitServerError, serr)
|
||||||
|
}
|
||||||
|
err = hc.SetEndpoints(eps)
|
||||||
|
if err != nil {
|
||||||
|
handleError(ExitServerError, err)
|
||||||
}
|
}
|
||||||
handleError(ExitServerError, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if debug {
|
if debug {
|
||||||
fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
|
fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
|
||||||
@ -267,3 +277,44 @@ func newClient(c *cli.Context) (client.Client, error) {
|
|||||||
func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
|
func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
|
||||||
return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
|
return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// syncWithPeerAPI syncs cluster with peer API defined at
|
||||||
|
// https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311.
|
||||||
|
// This exists for backward compatibility with etcd 0.4.x.
|
||||||
|
func syncWithPeerAPI(c *cli.Context, ctx context.Context, knownPeers []string) ([]string, error) {
|
||||||
|
tr, err := getTransport(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
body []byte
|
||||||
|
resp *http.Response
|
||||||
|
)
|
||||||
|
for _, p := range knownPeers {
|
||||||
|
var req *http.Request
|
||||||
|
req, err = http.NewRequest("GET", p+"/v2/peers", nil)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp, err = tr.RoundTrip(req)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
resp.Body.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
body, err = ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the peers API format: https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311
|
||||||
|
return strings.Split(string(body), ", "), nil
|
||||||
|
}
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
|
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||||
"github.com/coreos/etcd/discovery"
|
"github.com/coreos/etcd/discovery"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
@ -40,6 +39,7 @@ import (
|
|||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/pkg/cors"
|
"github.com/coreos/etcd/pkg/cors"
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/pkg/osutil"
|
"github.com/coreos/etcd/pkg/osutil"
|
||||||
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
@ -240,15 +240,23 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||||||
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
||||||
}
|
}
|
||||||
var l net.Listener
|
var l net.Listener
|
||||||
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
l, err = net.Listen("tcp", u.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if fdLimit, err := runtimeutil.FDLimit(); err == nil {
|
if fdLimit, err := runtimeutil.FDLimit(); err == nil {
|
||||||
if fdLimit <= reservedInternalFDNum {
|
if fdLimit <= reservedInternalFDNum {
|
||||||
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
||||||
}
|
}
|
||||||
l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do not wrap around this listener if TLS Info is set.
|
||||||
|
// HTTPS server expects TLS Conn created by TLSListener.
|
||||||
|
l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
urlStr := u.String()
|
urlStr := u.String()
|
||||||
@ -409,7 +417,7 @@ func startProxy(cfg *config) error {
|
|||||||
return clientURLs
|
return clientURLs
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
|
err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Warningf("proxy: error on writing urls %s", err)
|
plog.Warningf("proxy: error on writing urls %s", err)
|
||||||
return clientURLs
|
return clientURLs
|
||||||
|
@ -26,6 +26,9 @@ import (
|
|||||||
// creating a new service goroutine for each. The service goroutines
|
// creating a new service goroutine for each. The service goroutines
|
||||||
// read requests and then call handler to reply to them.
|
// read requests and then call handler to reply to them.
|
||||||
func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
|
func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
|
||||||
|
// TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which
|
||||||
|
// expect a TLS Conn type.
|
||||||
|
|
||||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||||
// TODO: add debug flag; enable logging when debug flag is set
|
// TODO: add debug flag; enable logging when debug flag is set
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
|
@ -174,12 +174,17 @@ type EtcdServer struct {
|
|||||||
// configuration is considered static for the lifetime of the EtcdServer.
|
// configuration is considered static for the lifetime of the EtcdServer.
|
||||||
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||||
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
|
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||||
|
|
||||||
var w *wal.WAL
|
var w *wal.WAL
|
||||||
var n raft.Node
|
var n raft.Node
|
||||||
var s *raft.MemoryStorage
|
var s *raft.MemoryStorage
|
||||||
var id types.ID
|
var id types.ID
|
||||||
var cl *cluster
|
var cl *cluster
|
||||||
|
|
||||||
|
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
||||||
|
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
||||||
|
}
|
||||||
|
|
||||||
// Run the migrations.
|
// Run the migrations.
|
||||||
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -189,11 +194,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = os.MkdirAll(cfg.MemberDir(), privateDirMode)
|
|
||||||
if err != nil && err != os.ErrExist {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
haveWAL := wal.Exist(cfg.WALDir())
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
ss := snap.New(cfg.SnapDir())
|
ss := snap.New(cfg.SnapDir())
|
||||||
|
|
||||||
@ -255,10 +255,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
cfg.PrintWithInitial()
|
cfg.PrintWithInitial()
|
||||||
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
|
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
|
||||||
case haveWAL:
|
case haveWAL:
|
||||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||||
}
|
}
|
||||||
@ -295,6 +291,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||||
|
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||||
|
}
|
||||||
|
|
||||||
sstats := &stats.ServerStats{
|
sstats := &stats.ServerStats{
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
ID: id.String(),
|
ID: id.String(),
|
||||||
|
@ -25,6 +25,8 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
privateFileMode = 0600
|
privateFileMode = 0600
|
||||||
|
// owner can make/remove files inside the directory
|
||||||
|
privateDirMode = 0700
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -55,3 +57,13 @@ func ReadDir(dirpath string) ([]string, error) {
|
|||||||
sort.Strings(names)
|
sort.Strings(names)
|
||||||
return names, nil
|
return names, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TouchDirAll is simliar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||||
|
// does not exists. TouchDirAll also ensures the given directory is writable.
|
||||||
|
func TouchDirAll(dir string) error {
|
||||||
|
err := os.MkdirAll(dir, privateDirMode)
|
||||||
|
if err != nil && err != os.ErrExist {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return IsDirWriteable(dir)
|
||||||
|
}
|
||||||
|
41
pkg/ioutil/util.go
Normal file
41
pkg/ioutil/util.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package ioutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WriteAndSyncFile behaviors just like ioutil.WriteFile in standard library
|
||||||
|
// but calls Sync before closing the file. WriteAndSyncFile guarantees the data
|
||||||
|
// is synced if there is no error returned.
|
||||||
|
func WriteAndSyncFile(filename string, data []byte, perm os.FileMode) error {
|
||||||
|
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err := f.Write(data)
|
||||||
|
if err == nil && n < len(data) {
|
||||||
|
err = io.ErrShortWrite
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
err = f.Sync()
|
||||||
|
}
|
||||||
|
if err1 := f.Close(); err == nil {
|
||||||
|
err = err1
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
@ -21,17 +21,19 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewKeepAliveListener returns a listener that listens on the given address.
|
type keepAliveConn interface {
|
||||||
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
|
SetKeepAlive(bool) error
|
||||||
func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listener, error) {
|
SetKeepAlivePeriod(d time.Duration) error
|
||||||
l, err := net.Listen("tcp", addr)
|
}
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// NewKeepAliveListener returns a listener that listens on the given address.
|
||||||
|
// Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
|
||||||
|
// Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
|
||||||
|
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
|
||||||
|
func NewKeepAliveListener(l net.Listener, scheme string, info TLSInfo) (net.Listener, error) {
|
||||||
if scheme == "https" {
|
if scheme == "https" {
|
||||||
if info.Empty() {
|
if info.Empty() {
|
||||||
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
|
return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented")
|
||||||
}
|
}
|
||||||
cfg, err := info.ServerConfig()
|
cfg, err := info.ServerConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -53,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tcpc := c.(*net.TCPConn)
|
kac := c.(keepAliveConn)
|
||||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||||
// default on linux: 30 + 8 * 30
|
// default on linux: 30 + 8 * 30
|
||||||
// default on osx: 30 + 8 * 75
|
// default on osx: 30 + 8 * 75
|
||||||
tcpc.SetKeepAlive(true)
|
kac.SetKeepAlive(true)
|
||||||
tcpc.SetKeepAlivePeriod(30 * time.Second)
|
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||||
return tcpc, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
|
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
|
||||||
@ -75,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tcpc := c.(*net.TCPConn)
|
kac := c.(keepAliveConn)
|
||||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||||
// default on linux: 30 + 8 * 30
|
// default on linux: 30 + 8 * 30
|
||||||
// default on osx: 30 + 8 * 75
|
// default on osx: 30 + 8 * 75
|
||||||
tcpc.SetKeepAlive(true)
|
kac.SetKeepAlive(true)
|
||||||
tcpc.SetKeepAlivePeriod(30 * time.Second)
|
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||||
c = tls.Server(c, l.config)
|
c = tls.Server(c, l.config)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
@ -25,7 +26,12 @@ import (
|
|||||||
// that accepts connections.
|
// that accepts connections.
|
||||||
// TODO: verify the keepalive option is set correctly
|
// TODO: verify the keepalive option is set correctly
|
||||||
func TestNewKeepAliveListener(t *testing.T) {
|
func TestNewKeepAliveListener(t *testing.T) {
|
||||||
ln, err := NewKeepAliveListener("127.0.0.1:0", "http", TLSInfo{})
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected listen error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ln, err = NewKeepAliveListener(ln, "http", TLSInfo{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
||||||
}
|
}
|
||||||
@ -38,6 +44,7 @@ func TestNewKeepAliveListener(t *testing.T) {
|
|||||||
conn.Close()
|
conn.Close()
|
||||||
ln.Close()
|
ln.Close()
|
||||||
|
|
||||||
|
ln, err = net.Listen("tcp", "127.0.0.1:0")
|
||||||
// tls
|
// tls
|
||||||
tmp, err := createTempFile([]byte("XXX"))
|
tmp, err := createTempFile([]byte("XXX"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -46,7 +53,7 @@ func TestNewKeepAliveListener(t *testing.T) {
|
|||||||
defer os.Remove(tmp)
|
defer os.Remove(tmp)
|
||||||
tlsInfo := TLSInfo{CertFile: tmp, KeyFile: tmp}
|
tlsInfo := TLSInfo{CertFile: tmp, KeyFile: tmp}
|
||||||
tlsInfo.parseFunc = fakeCertificateParserFunc(tls.Certificate{}, nil)
|
tlsInfo.parseFunc = fakeCertificateParserFunc(tls.Certificate{}, nil)
|
||||||
tlsln, err := NewKeepAliveListener("127.0.0.1:0", "https", tlsInfo)
|
tlsln, err := NewKeepAliveListener(ln, "https", tlsInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
||||||
}
|
}
|
||||||
@ -64,7 +71,12 @@ func TestNewKeepAliveListener(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
|
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
|
||||||
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected listen error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = NewKeepAliveListener(ln, "https", TLSInfo{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("err = nil, want not presented error")
|
t.Errorf("err = nil, want not presented error")
|
||||||
}
|
}
|
||||||
|
@ -4,11 +4,17 @@
|
|||||||
|
|
||||||
// Package netutil provides network utility functions, complementing the more
|
// Package netutil provides network utility functions, complementing the more
|
||||||
// common ones in the net package.
|
// common ones in the net package.
|
||||||
package netutil
|
package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNotTCP = errors.New("only tcp connections have keepalive")
|
||||||
)
|
)
|
||||||
|
|
||||||
// LimitListener returns a Listener that accepts at most n simultaneous
|
// LimitListener returns a Listener that accepts at most n simultaneous
|
||||||
@ -46,3 +52,19 @@ func (l *limitListenerConn) Close() error {
|
|||||||
l.releaseOnce.Do(l.release)
|
l.releaseOnce.Do(l.release)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
|
||||||
|
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
return ErrNotTCP
|
||||||
|
}
|
||||||
|
return tcpc.SetKeepAlive(doKeepAlive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
|
||||||
|
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
return ErrNotTCP
|
||||||
|
}
|
||||||
|
return tcpc.SetKeepAlivePeriod(d)
|
||||||
|
}
|
@ -78,8 +78,9 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
|||||||
defer wh.mutex.Unlock()
|
defer wh.mutex.Unlock()
|
||||||
// If the event exists in the known history, append the EtcdIndex and return immediately
|
// If the event exists in the known history, append the EtcdIndex and return immediately
|
||||||
if event != nil {
|
if event != nil {
|
||||||
event.EtcdIndex = storeIndex
|
ne := event.Clone()
|
||||||
w.eventChan <- event
|
ne.EtcdIndex = storeIndex
|
||||||
|
w.eventChan <- ne
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.1.0"
|
MinClusterVersion = "2.1.0"
|
||||||
Version = "2.2.2"
|
Version = "2.2.4"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
Reference in New Issue
Block a user