Compare commits
17 Commits
Author | SHA1 | Date | |
---|---|---|---|
bdee27b19e | |||
7f684641a3 | |||
d54cf26bed | |||
d3e73aadab | |||
e340928988 | |||
e6ffe22e16 | |||
05b564a394 | |||
cb779b2305 | |||
22c3208fb3 | |||
e44372e430 | |||
05a90bc1e5 | |||
6751727809 | |||
916106c3a2 | |||
e0c7768f94 | |||
0fb2d5d4d3 | |||
fc61fc7c7a | |||
09b81bad15 |
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -129,10 +129,6 @@
|
||||
"ImportPath": "golang.org/x/net/context",
|
||||
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/net/netutil",
|
||||
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
|
||||
},
|
||||
{
|
||||
"ImportPath": "golang.org/x/oauth2",
|
||||
"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"
|
||||
|
103
Godeps/_workspace/src/golang.org/x/net/netutil/listen_test.go
generated
vendored
103
Godeps/_workspace/src/golang.org/x/net/netutil/listen_test.go
generated
vendored
@ -1,103 +0,0 @@
|
||||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build go1.3
|
||||
|
||||
// (We only run this test on Go 1.3 because the HTTP client timeout behavior
|
||||
// was bad in previous releases, causing occasional deadlocks.)
|
||||
|
||||
package netutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLimitListener(t *testing.T) {
|
||||
const (
|
||||
max = 5
|
||||
num = 200
|
||||
)
|
||||
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Listen: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
l = LimitListener(l, max)
|
||||
|
||||
var open int32
|
||||
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if n := atomic.AddInt32(&open, 1); n > max {
|
||||
t.Errorf("%d open connections, want <= %d", n, max)
|
||||
}
|
||||
defer atomic.AddInt32(&open, -1)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
fmt.Fprint(w, "some body")
|
||||
}))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var failed int32
|
||||
for i := 0; i < num; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c := http.Client{Timeout: 3 * time.Second}
|
||||
r, err := c.Get("http://" + l.Addr().String())
|
||||
if err != nil {
|
||||
t.Logf("Get: %v", err)
|
||||
atomic.AddInt32(&failed, 1)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
io.Copy(ioutil.Discard, r.Body)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// We expect some Gets to fail as the kernel's accept queue is filled,
|
||||
// but most should succeed.
|
||||
if failed >= num/2 {
|
||||
t.Errorf("too many Gets failed: %v", failed)
|
||||
}
|
||||
}
|
||||
|
||||
type errorListener struct {
|
||||
net.Listener
|
||||
}
|
||||
|
||||
func (errorListener) Accept() (net.Conn, error) {
|
||||
return nil, errFake
|
||||
}
|
||||
|
||||
var errFake = errors.New("fake error from errorListener")
|
||||
|
||||
// This used to hang.
|
||||
func TestLimitListenerError(t *testing.T) {
|
||||
donec := make(chan bool, 1)
|
||||
go func() {
|
||||
const n = 2
|
||||
ll := LimitListener(errorListener{}, n)
|
||||
for i := 0; i < n+1; i++ {
|
||||
_, err := ll.Accept()
|
||||
if err != errFake {
|
||||
t.Fatalf("Accept error = %v; want errFake", err)
|
||||
}
|
||||
}
|
||||
donec <- true
|
||||
}()
|
||||
select {
|
||||
case <-donec:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout. deadlock?")
|
||||
}
|
||||
}
|
@ -162,6 +162,11 @@ type Client interface {
|
||||
// this may differ from the initial Endpoints provided in the Config.
|
||||
Endpoints() []string
|
||||
|
||||
// SetEndpoints sets the set of API endpoints used by Client to resolve
|
||||
// HTTP requests. If the given endpoints are not valid, an error will be
|
||||
// returned
|
||||
SetEndpoints(eps []string) error
|
||||
|
||||
httpClient
|
||||
}
|
||||
|
||||
@ -176,7 +181,7 @@ func New(cfg Config) (Client, error) {
|
||||
password: cfg.Password,
|
||||
}
|
||||
}
|
||||
if err := c.reset(cfg.Endpoints); err != nil {
|
||||
if err := c.SetEndpoints(cfg.Endpoints); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
@ -219,7 +224,7 @@ type httpClusterClient struct {
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) reset(eps []string) error {
|
||||
func (c *httpClusterClient) SetEndpoints(eps []string) error {
|
||||
if len(eps) == 0 {
|
||||
return ErrNoEndpoints
|
||||
}
|
||||
@ -341,7 +346,7 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.reset(eps)
|
||||
return c.SetEndpoints(eps)
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
|
||||
@ -378,9 +383,12 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
hctx, hcancel := context.WithCancel(ctx)
|
||||
var hctx context.Context
|
||||
var hcancel context.CancelFunc
|
||||
if c.headerTimeout > 0 {
|
||||
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
|
||||
} else {
|
||||
hctx, hcancel = context.WithCancel(ctx)
|
||||
}
|
||||
defer hcancel()
|
||||
|
||||
|
@ -705,7 +705,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
|
||||
clientFactory: cf,
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
}
|
||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during setup: %#v", err)
|
||||
}
|
||||
@ -728,7 +728,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
|
||||
t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
|
||||
}
|
||||
|
||||
err = hc.reset([]string{"http://127.0.0.1:4009"})
|
||||
err = hc.SetEndpoints([]string{"http://127.0.0.1:4009"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during reset: %#v", err)
|
||||
}
|
||||
@ -749,7 +749,7 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
|
||||
clientFactory: cf,
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
}
|
||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during setup: %#v", err)
|
||||
}
|
||||
@ -783,7 +783,7 @@ func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
|
||||
clientFactory: cf,
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
}
|
||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during setup: %#v", err)
|
||||
}
|
||||
@ -805,7 +805,7 @@ func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
|
||||
clientFactory: cf,
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
}
|
||||
err := hc.reset([]string{"http://127.0.0.1:2379"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during setup: %#v", err)
|
||||
}
|
||||
@ -838,7 +838,7 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
|
||||
clientFactory: cf,
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
}
|
||||
err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error during setup: %#v", err)
|
||||
}
|
||||
@ -867,7 +867,7 @@ func TestHTTPClusterClientResetFail(t *testing.T) {
|
||||
|
||||
for i, tt := range tests {
|
||||
hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
|
||||
err := hc.reset(tt)
|
||||
err := hc.SetEndpoints(tt)
|
||||
if err == nil {
|
||||
t.Errorf("#%d: expected non-nil error", i)
|
||||
}
|
||||
@ -879,7 +879,7 @@ func TestHTTPClusterClientResetPinRandom(t *testing.T) {
|
||||
pinNum := 0
|
||||
for i := 0; i < round; i++ {
|
||||
hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
|
||||
err := hc.reset([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
|
||||
err := hc.SetEndpoints([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: reset error (%v)", i, err)
|
||||
}
|
||||
|
@ -108,8 +108,13 @@ func handleClusterHealth(c *cli.Context) {
|
||||
}
|
||||
|
||||
if !forever {
|
||||
break
|
||||
if health {
|
||||
os.Exit(ExitSuccess)
|
||||
} else {
|
||||
os.Exit(ExitClusterNotHealthy)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("\nnext check after 10 second...\n\n")
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ const (
|
||||
ExitBadConnection
|
||||
ExitBadAuth
|
||||
ExitServerError
|
||||
ExitClusterNotHealthy
|
||||
)
|
||||
|
||||
func handleError(code int, err error) {
|
||||
|
@ -16,7 +16,6 @@ package command
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||
@ -43,15 +42,9 @@ func updatedirCommandFunc(c *cli.Context, ki client.KeysAPI) {
|
||||
handleError(ExitBadArgs, errors.New("key required"))
|
||||
}
|
||||
key := c.Args()[0]
|
||||
value, err := argOrStdin(c.Args(), os.Stdin, 1)
|
||||
if err != nil {
|
||||
handleError(ExitBadArgs, errors.New("value required"))
|
||||
}
|
||||
|
||||
ttl := c.Int("ttl")
|
||||
|
||||
ctx, cancel := contextWithTotalTimeout(c)
|
||||
_, err = ki.Set(ctx, key, value, &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
|
||||
_, err := ki.Set(ctx, key, "", &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
|
||||
cancel()
|
||||
if err != nil {
|
||||
handleError(ExitServerError, err)
|
||||
|
@ -203,9 +203,19 @@ func mustNewClient(c *cli.Context) client.Client {
|
||||
if err == client.ErrNoEndpoints {
|
||||
fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
|
||||
fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
|
||||
handleError(ExitServerError, err)
|
||||
}
|
||||
|
||||
// fail-back to try sync cluster with peer API. this is for making etcdctl work with etcd 0.4.x.
|
||||
// TODO: remove this when we deprecate the support for etcd 0.4.
|
||||
eps, serr := syncWithPeerAPI(c, ctx, hc.Endpoints())
|
||||
if serr != nil {
|
||||
handleError(ExitServerError, serr)
|
||||
}
|
||||
err = hc.SetEndpoints(eps)
|
||||
if err != nil {
|
||||
handleError(ExitServerError, err)
|
||||
}
|
||||
handleError(ExitServerError, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if debug {
|
||||
fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
|
||||
@ -267,3 +277,44 @@ func newClient(c *cli.Context) (client.Client, error) {
|
||||
func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
|
||||
}
|
||||
|
||||
// syncWithPeerAPI syncs cluster with peer API defined at
|
||||
// https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311.
|
||||
// This exists for backward compatibility with etcd 0.4.x.
|
||||
func syncWithPeerAPI(c *cli.Context, ctx context.Context, knownPeers []string) ([]string, error) {
|
||||
tr, err := getTransport(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
body []byte
|
||||
resp *http.Response
|
||||
)
|
||||
for _, p := range knownPeers {
|
||||
var req *http.Request
|
||||
req, err = http.NewRequest("GET", p+"/v2/peers", nil)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
resp, err = tr.RoundTrip(req)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
continue
|
||||
}
|
||||
body, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse the peers API format: https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311
|
||||
return strings.Split(string(body), ", "), nil
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
|
||||
"github.com/coreos/etcd/discovery"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
@ -40,6 +39,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
@ -240,15 +240,23 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
||||
}
|
||||
var l net.Listener
|
||||
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
||||
l, err = net.Listen("tcp", u.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if fdLimit, err := runtimeutil.FDLimit(); err == nil {
|
||||
if fdLimit <= reservedInternalFDNum {
|
||||
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
||||
}
|
||||
l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
||||
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
||||
}
|
||||
|
||||
// Do not wrap around this listener if TLS Info is set.
|
||||
// HTTPS server expects TLS Conn created by TLSListener.
|
||||
l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
urlStr := u.String()
|
||||
@ -409,7 +417,7 @@ func startProxy(cfg *config) error {
|
||||
return clientURLs
|
||||
}
|
||||
|
||||
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
|
||||
err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
|
||||
if err != nil {
|
||||
plog.Warningf("proxy: error on writing urls %s", err)
|
||||
return clientURLs
|
||||
|
@ -26,6 +26,9 @@ import (
|
||||
// creating a new service goroutine for each. The service goroutines
|
||||
// read requests and then call handler to reply to them.
|
||||
func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
|
||||
// TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which
|
||||
// expect a TLS Conn type.
|
||||
|
||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||
// TODO: add debug flag; enable logging when debug flag is set
|
||||
srv := &http.Server{
|
||||
|
@ -174,12 +174,17 @@ type EtcdServer struct {
|
||||
// configuration is considered static for the lifetime of the EtcdServer.
|
||||
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
var cl *cluster
|
||||
|
||||
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
||||
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
||||
}
|
||||
|
||||
// Run the migrations.
|
||||
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
||||
if err != nil {
|
||||
@ -189,11 +194,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = os.MkdirAll(cfg.MemberDir(), privateDirMode)
|
||||
if err != nil && err != os.ErrExist {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
@ -255,10 +255,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
cfg.PrintWithInitial()
|
||||
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
|
||||
case haveWAL:
|
||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
@ -295,6 +291,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||
}
|
||||
|
||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||
}
|
||||
|
||||
sstats := &stats.ServerStats{
|
||||
Name: cfg.Name,
|
||||
ID: id.String(),
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
|
||||
const (
|
||||
privateFileMode = 0600
|
||||
// owner can make/remove files inside the directory
|
||||
privateDirMode = 0700
|
||||
)
|
||||
|
||||
var (
|
||||
@ -55,3 +57,13 @@ func ReadDir(dirpath string) ([]string, error) {
|
||||
sort.Strings(names)
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// TouchDirAll is simliar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||
// does not exists. TouchDirAll also ensures the given directory is writable.
|
||||
func TouchDirAll(dir string) error {
|
||||
err := os.MkdirAll(dir, privateDirMode)
|
||||
if err != nil && err != os.ErrExist {
|
||||
return err
|
||||
}
|
||||
return IsDirWriteable(dir)
|
||||
}
|
||||
|
41
pkg/ioutil/util.go
Normal file
41
pkg/ioutil/util.go
Normal file
@ -0,0 +1,41 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ioutil
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// WriteAndSyncFile behaviors just like ioutil.WriteFile in standard library
|
||||
// but calls Sync before closing the file. WriteAndSyncFile guarantees the data
|
||||
// is synced if there is no error returned.
|
||||
func WriteAndSyncFile(filename string, data []byte, perm os.FileMode) error {
|
||||
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := f.Write(data)
|
||||
if err == nil && n < len(data) {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
if err == nil {
|
||||
err = f.Sync()
|
||||
}
|
||||
if err1 := f.Close(); err == nil {
|
||||
err = err1
|
||||
}
|
||||
return err
|
||||
}
|
@ -21,17 +21,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// NewKeepAliveListener returns a listener that listens on the given address.
|
||||
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
|
||||
func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listener, error) {
|
||||
l, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
type keepAliveConn interface {
|
||||
SetKeepAlive(bool) error
|
||||
SetKeepAlivePeriod(d time.Duration) error
|
||||
}
|
||||
|
||||
// NewKeepAliveListener returns a listener that listens on the given address.
|
||||
// Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
|
||||
// Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
|
||||
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
|
||||
func NewKeepAliveListener(l net.Listener, scheme string, info TLSInfo) (net.Listener, error) {
|
||||
if scheme == "https" {
|
||||
if info.Empty() {
|
||||
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
|
||||
return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented")
|
||||
}
|
||||
cfg, err := info.ServerConfig()
|
||||
if err != nil {
|
||||
@ -53,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tcpc := c.(*net.TCPConn)
|
||||
kac := c.(keepAliveConn)
|
||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||
// default on linux: 30 + 8 * 30
|
||||
// default on osx: 30 + 8 * 75
|
||||
tcpc.SetKeepAlive(true)
|
||||
tcpc.SetKeepAlivePeriod(30 * time.Second)
|
||||
return tcpc, nil
|
||||
kac.SetKeepAlive(true)
|
||||
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
|
||||
@ -75,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tcpc := c.(*net.TCPConn)
|
||||
kac := c.(keepAliveConn)
|
||||
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
|
||||
// default on linux: 30 + 8 * 30
|
||||
// default on osx: 30 + 8 * 75
|
||||
tcpc.SetKeepAlive(true)
|
||||
tcpc.SetKeepAlivePeriod(30 * time.Second)
|
||||
kac.SetKeepAlive(true)
|
||||
kac.SetKeepAlivePeriod(30 * time.Second)
|
||||
c = tls.Server(c, l.config)
|
||||
return
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package transport
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
@ -25,7 +26,12 @@ import (
|
||||
// that accepts connections.
|
||||
// TODO: verify the keepalive option is set correctly
|
||||
func TestNewKeepAliveListener(t *testing.T) {
|
||||
ln, err := NewKeepAliveListener("127.0.0.1:0", "http", TLSInfo{})
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected listen error: %v", err)
|
||||
}
|
||||
|
||||
ln, err = NewKeepAliveListener(ln, "http", TLSInfo{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
||||
}
|
||||
@ -38,6 +44,7 @@ func TestNewKeepAliveListener(t *testing.T) {
|
||||
conn.Close()
|
||||
ln.Close()
|
||||
|
||||
ln, err = net.Listen("tcp", "127.0.0.1:0")
|
||||
// tls
|
||||
tmp, err := createTempFile([]byte("XXX"))
|
||||
if err != nil {
|
||||
@ -46,7 +53,7 @@ func TestNewKeepAliveListener(t *testing.T) {
|
||||
defer os.Remove(tmp)
|
||||
tlsInfo := TLSInfo{CertFile: tmp, KeyFile: tmp}
|
||||
tlsInfo.parseFunc = fakeCertificateParserFunc(tls.Certificate{}, nil)
|
||||
tlsln, err := NewKeepAliveListener("127.0.0.1:0", "https", tlsInfo)
|
||||
tlsln, err := NewKeepAliveListener(ln, "https", tlsInfo)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
|
||||
}
|
||||
@ -64,7 +71,12 @@ func TestNewKeepAliveListener(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
|
||||
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected listen error: %v", err)
|
||||
}
|
||||
|
||||
_, err = NewKeepAliveListener(ln, "https", TLSInfo{})
|
||||
if err == nil {
|
||||
t.Errorf("err = nil, want not presented error")
|
||||
}
|
||||
|
@ -4,11 +4,17 @@
|
||||
|
||||
// Package netutil provides network utility functions, complementing the more
|
||||
// common ones in the net package.
|
||||
package netutil
|
||||
package transport
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotTCP = errors.New("only tcp connections have keepalive")
|
||||
)
|
||||
|
||||
// LimitListener returns a Listener that accepts at most n simultaneous
|
||||
@ -46,3 +52,19 @@ func (l *limitListenerConn) Close() error {
|
||||
l.releaseOnce.Do(l.release)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
|
||||
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
return ErrNotTCP
|
||||
}
|
||||
return tcpc.SetKeepAlive(doKeepAlive)
|
||||
}
|
||||
|
||||
func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
|
||||
tcpc, ok := l.Conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
return ErrNotTCP
|
||||
}
|
||||
return tcpc.SetKeepAlivePeriod(d)
|
||||
}
|
@ -78,8 +78,9 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
||||
defer wh.mutex.Unlock()
|
||||
// If the event exists in the known history, append the EtcdIndex and return immediately
|
||||
if event != nil {
|
||||
event.EtcdIndex = storeIndex
|
||||
w.eventChan <- event
|
||||
ne := event.Clone()
|
||||
ne.EtcdIndex = storeIndex
|
||||
w.eventChan <- ne
|
||||
return w, nil
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.1.0"
|
||||
Version = "2.2.2"
|
||||
Version = "2.2.4"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
Reference in New Issue
Block a user