Compare commits

...

17 Commits

Author SHA1 Message Date
bdee27b19e *: bump to v2.2.4 2016-01-13 14:07:39 -08:00
7f684641a3 Godeps: remove golang.org/x/net/netutil
Now using our own LimitListener to support KeepAlives.
2016-01-13 13:32:38 -08:00
d54cf26bed etcdmain: support keep alive listeners on limit listener connections
Fixes #4171
2016-01-13 13:32:26 -08:00
d3e73aadab etcdmain: tls listener MUST be at the outer layer of all listeners
go HTTP library uses type assertion to determine if a connection
is a TLS connection. If we wrapper TLS Listener with any customized
Listener that can create customized Conn, HTTPs will be broken.

This commit fixes the issue.
2016-01-13 13:32:08 -08:00
e340928988 etcdctl: ignore value in updatedir command
Fixes coreos#4145.
client.KeysAPI ignores value if SetOptions.Dir is true.
2016-01-13 13:31:49 -08:00
e6ffe22e16 *: bump to v2.2.3+git 2015-12-30 13:54:57 -08:00
05b564a394 *: bump to v2.2.3 2015-12-30 13:41:16 -08:00
cb779b2305 etcdctl: fix syncWithPeerAPI by breaking the loop when there is no error 2015-12-30 11:24:27 -08:00
22c3208fb3 etcdserver: always check if the data dir is writable before starting etcd 2015-12-30 10:22:52 -08:00
e44372e430 etcdsever: avoid creating member dir before finishing validate bootstrap
This commit fixes the issue of creating member dir before validating
the configuration. When member dir exists, it indicates the local etcd
process is a valid etcd member. So we should only create member dir
after we finish configuration validation, joining validation or
discovery validation.
2015-12-30 10:19:51 -08:00
05a90bc1e5 etcdmain: fix incomplete proxy config file
etcd might generate incomplete proxy config file after a power failure.
It is because we use ioutil.WriteFile. And iotuile.WriteFile does
not call Sync before closing the file.
2015-12-22 12:30:39 -08:00
6751727809 etcdctl: support basic operations with etcd 0.4.
For CoreOS users, they will get a updated version of etcdctl without updating
the etcd server version. And the users cannot really control this behavior.
We do not want to suddenly break them without enough communication.

So we still want the most basic opeartions like get, set, watch of etcdctl2 work
with etcd 0.4. This patches solve the incompability issue.
2015-12-22 12:29:16 -08:00
916106c3a2 client: support reset Endpoints.
ResetEndpoints is useful when the there is a scheduled cluster
changes or when manually manage the cluster without auto-sync
enabled.
2015-12-22 12:25:59 -08:00
e0c7768f94 store: fix data race when modify event in watchHub.
The event got from watchHub should be considered as readonly.
To modify it, we first need to get a clone of it or there might
be a data race.
2015-12-14 14:08:39 -08:00
0fb2d5d4d3 client: fix goroutine leak in unreleased context
If headerTimeout is not zero then two context are created but only one is released.
cancelCtx in this case is never released which leads to goroutine leak inside it.
2015-12-14 13:58:43 -08:00
fc61fc7c7a etcdctl: cluster health exit with non-zero when cluster is unhealthy 2015-12-14 13:58:10 -08:00
09b81bad15 *: bump to v2.2.2+git 2015-11-19 14:24:59 -08:00
18 changed files with 217 additions and 165 deletions

4
Godeps/Godeps.json generated
View File

@ -129,10 +129,6 @@
"ImportPath": "golang.org/x/net/context",
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
},
{
"ImportPath": "golang.org/x/net/netutil",
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
},
{
"ImportPath": "golang.org/x/oauth2",
"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"

View File

@ -1,103 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.3
// (We only run this test on Go 1.3 because the HTTP client timeout behavior
// was bad in previous releases, causing occasional deadlocks.)
package netutil
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestLimitListener(t *testing.T) {
const (
max = 5
num = 200
)
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer l.Close()
l = LimitListener(l, max)
var open int32
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > max {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
}))
var wg sync.WaitGroup
var failed int32
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c := http.Client{Timeout: 3 * time.Second}
r, err := c.Get("http://" + l.Addr().String())
if err != nil {
t.Logf("Get: %v", err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the kernel's accept queue is filled,
// but most should succeed.
if failed >= num/2 {
t.Errorf("too many Gets failed: %v", failed)
}
}
type errorListener struct {
net.Listener
}
func (errorListener) Accept() (net.Conn, error) {
return nil, errFake
}
var errFake = errors.New("fake error from errorListener")
// This used to hang.
func TestLimitListenerError(t *testing.T) {
donec := make(chan bool, 1)
go func() {
const n = 2
ll := LimitListener(errorListener{}, n)
for i := 0; i < n+1; i++ {
_, err := ll.Accept()
if err != errFake {
t.Fatalf("Accept error = %v; want errFake", err)
}
}
donec <- true
}()
select {
case <-donec:
case <-time.After(5 * time.Second):
t.Fatal("timeout. deadlock?")
}
}

View File

@ -162,6 +162,11 @@ type Client interface {
// this may differ from the initial Endpoints provided in the Config.
Endpoints() []string
// SetEndpoints sets the set of API endpoints used by Client to resolve
// HTTP requests. If the given endpoints are not valid, an error will be
// returned
SetEndpoints(eps []string) error
httpClient
}
@ -176,7 +181,7 @@ func New(cfg Config) (Client, error) {
password: cfg.Password,
}
}
if err := c.reset(cfg.Endpoints); err != nil {
if err := c.SetEndpoints(cfg.Endpoints); err != nil {
return nil, err
}
return c, nil
@ -219,7 +224,7 @@ type httpClusterClient struct {
rand *rand.Rand
}
func (c *httpClusterClient) reset(eps []string) error {
func (c *httpClusterClient) SetEndpoints(eps []string) error {
if len(eps) == 0 {
return ErrNoEndpoints
}
@ -341,7 +346,7 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
return nil
}
return c.reset(eps)
return c.SetEndpoints(eps)
}
func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
@ -378,9 +383,12 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
return nil, nil, err
}
hctx, hcancel := context.WithCancel(ctx)
var hctx context.Context
var hcancel context.CancelFunc
if c.headerTimeout > 0 {
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
} else {
hctx, hcancel = context.WithCancel(ctx)
}
defer hcancel()

View File

@ -705,7 +705,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
clientFactory: cf,
rand: rand.New(rand.NewSource(0)),
}
err := hc.reset([]string{"http://127.0.0.1:2379"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
if err != nil {
t.Fatalf("unexpected error during setup: %#v", err)
}
@ -728,7 +728,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
}
err = hc.reset([]string{"http://127.0.0.1:4009"})
err = hc.SetEndpoints([]string{"http://127.0.0.1:4009"})
if err != nil {
t.Fatalf("unexpected error during reset: %#v", err)
}
@ -749,7 +749,7 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
clientFactory: cf,
rand: rand.New(rand.NewSource(0)),
}
err := hc.reset([]string{"http://127.0.0.1:2379"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
if err != nil {
t.Fatalf("unexpected error during setup: %#v", err)
}
@ -783,7 +783,7 @@ func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
clientFactory: cf,
rand: rand.New(rand.NewSource(0)),
}
err := hc.reset([]string{"http://127.0.0.1:2379"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
if err != nil {
t.Fatalf("unexpected error during setup: %#v", err)
}
@ -805,7 +805,7 @@ func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
clientFactory: cf,
rand: rand.New(rand.NewSource(0)),
}
err := hc.reset([]string{"http://127.0.0.1:2379"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
if err != nil {
t.Fatalf("unexpected error during setup: %#v", err)
}
@ -838,7 +838,7 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
clientFactory: cf,
rand: rand.New(rand.NewSource(0)),
}
err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
if err != nil {
t.Fatalf("unexpected error during setup: %#v", err)
}
@ -867,7 +867,7 @@ func TestHTTPClusterClientResetFail(t *testing.T) {
for i, tt := range tests {
hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
err := hc.reset(tt)
err := hc.SetEndpoints(tt)
if err == nil {
t.Errorf("#%d: expected non-nil error", i)
}
@ -879,7 +879,7 @@ func TestHTTPClusterClientResetPinRandom(t *testing.T) {
pinNum := 0
for i := 0; i < round; i++ {
hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
err := hc.reset([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
err := hc.SetEndpoints([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
if err != nil {
t.Fatalf("#%d: reset error (%v)", i, err)
}

View File

@ -108,8 +108,13 @@ func handleClusterHealth(c *cli.Context) {
}
if !forever {
break
if health {
os.Exit(ExitSuccess)
} else {
os.Exit(ExitClusterNotHealthy)
}
}
fmt.Printf("\nnext check after 10 second...\n\n")
time.Sleep(10 * time.Second)
}

View File

@ -27,6 +27,7 @@ const (
ExitBadConnection
ExitBadAuth
ExitServerError
ExitClusterNotHealthy
)
func handleError(code int, err error) {

View File

@ -16,7 +16,6 @@ package command
import (
"errors"
"os"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
@ -43,15 +42,9 @@ func updatedirCommandFunc(c *cli.Context, ki client.KeysAPI) {
handleError(ExitBadArgs, errors.New("key required"))
}
key := c.Args()[0]
value, err := argOrStdin(c.Args(), os.Stdin, 1)
if err != nil {
handleError(ExitBadArgs, errors.New("value required"))
}
ttl := c.Int("ttl")
ctx, cancel := contextWithTotalTimeout(c)
_, err = ki.Set(ctx, key, value, &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
_, err := ki.Set(ctx, key, "", &client.SetOptions{TTL: time.Duration(ttl) * time.Second, Dir: true, PrevExist: client.PrevExist})
cancel()
if err != nil {
handleError(ExitServerError, err)

View File

@ -203,9 +203,19 @@ func mustNewClient(c *cli.Context) client.Client {
if err == client.ErrNoEndpoints {
fmt.Fprintf(os.Stderr, "etcd cluster has no published client endpoints.\n")
fmt.Fprintf(os.Stderr, "Try '--no-sync' if you want to access non-published client endpoints(%s).\n", strings.Join(hc.Endpoints(), ","))
handleError(ExitServerError, err)
}
// fail-back to try sync cluster with peer API. this is for making etcdctl work with etcd 0.4.x.
// TODO: remove this when we deprecate the support for etcd 0.4.
eps, serr := syncWithPeerAPI(c, ctx, hc.Endpoints())
if serr != nil {
handleError(ExitServerError, serr)
}
err = hc.SetEndpoints(eps)
if err != nil {
handleError(ExitServerError, err)
}
handleError(ExitServerError, err)
os.Exit(1)
}
if debug {
fmt.Fprintf(os.Stderr, "got endpoints(%s) after sync\n", strings.Join(hc.Endpoints(), ","))
@ -267,3 +277,44 @@ func newClient(c *cli.Context) (client.Client, error) {
func contextWithTotalTimeout(c *cli.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), c.GlobalDuration("total-timeout"))
}
// syncWithPeerAPI syncs cluster with peer API defined at
// https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311.
// This exists for backward compatibility with etcd 0.4.x.
func syncWithPeerAPI(c *cli.Context, ctx context.Context, knownPeers []string) ([]string, error) {
tr, err := getTransport(c)
if err != nil {
return nil, err
}
var (
body []byte
resp *http.Response
)
for _, p := range knownPeers {
var req *http.Request
req, err = http.NewRequest("GET", p+"/v2/peers", nil)
if err != nil {
continue
}
resp, err = tr.RoundTrip(req)
if err != nil {
continue
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
continue
}
body, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err == nil {
break
}
}
if err != nil {
return nil, err
}
// Parse the peers API format: https://github.com/coreos/etcd/blob/v0.4.9/server/server.go#L311
return strings.Split(string(body), ", "), nil
}

View File

@ -31,7 +31,6 @@ import (
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver"
@ -40,6 +39,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/fileutil"
pkgioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/osutil"
runtimeutil "github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/transport"
@ -240,15 +240,23 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
}
var l net.Listener
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
l, err = net.Listen("tcp", u.Host)
if err != nil {
return nil, err
}
if fdLimit, err := runtimeutil.FDLimit(); err == nil {
if fdLimit <= reservedInternalFDNum {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
}
l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
}
// Do not wrap around this listener if TLS Info is set.
// HTTPS server expects TLS Conn created by TLSListener.
l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
if err != nil {
return nil, err
}
urlStr := u.String()
@ -409,7 +417,7 @@ func startProxy(cfg *config) error {
return clientURLs
}
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
if err != nil {
plog.Warningf("proxy: error on writing urls %s", err)
return clientURLs

View File

@ -26,6 +26,9 @@ import (
// creating a new service goroutine for each. The service goroutines
// read requests and then call handler to reply to them.
func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error {
// TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which
// expect a TLS Conn type.
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
// TODO: add debug flag; enable logging when debug flag is set
srv := &http.Server{

View File

@ -174,12 +174,17 @@ type EtcdServer struct {
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var w *wal.WAL
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var cl *cluster
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
if err != nil {
@ -189,11 +194,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, err
}
err = os.MkdirAll(cfg.MemberDir(), privateDirMode)
if err != nil && err != os.ErrExist {
return nil, err
}
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
@ -255,10 +255,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL:
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err)
}
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
@ -295,6 +291,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, fmt.Errorf("unsupported bootstrap config")
}
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
sstats := &stats.ServerStats{
Name: cfg.Name,
ID: id.String(),

View File

@ -25,6 +25,8 @@ import (
const (
privateFileMode = 0600
// owner can make/remove files inside the directory
privateDirMode = 0700
)
var (
@ -55,3 +57,13 @@ func ReadDir(dirpath string) ([]string, error) {
sort.Strings(names)
return names, nil
}
// TouchDirAll is simliar to os.MkdirAll. It creates directories with 0700 permission if any directory
// does not exists. TouchDirAll also ensures the given directory is writable.
func TouchDirAll(dir string) error {
err := os.MkdirAll(dir, privateDirMode)
if err != nil && err != os.ErrExist {
return err
}
return IsDirWriteable(dir)
}

41
pkg/ioutil/util.go Normal file
View File

@ -0,0 +1,41 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ioutil
import (
"io"
"os"
)
// WriteAndSyncFile behaviors just like ioutil.WriteFile in standard library
// but calls Sync before closing the file. WriteAndSyncFile guarantees the data
// is synced if there is no error returned.
func WriteAndSyncFile(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err == nil {
err = f.Sync()
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}

View File

@ -21,17 +21,19 @@ import (
"time"
)
// NewKeepAliveListener returns a listener that listens on the given address.
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
type keepAliveConn interface {
SetKeepAlive(bool) error
SetKeepAlivePeriod(d time.Duration) error
}
// NewKeepAliveListener returns a listener that listens on the given address.
// Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
// Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
func NewKeepAliveListener(l net.Listener, scheme string, info TLSInfo) (net.Listener, error) {
if scheme == "https" {
if info.Empty() {
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented")
}
cfg, err := info.ServerConfig()
if err != nil {
@ -53,13 +55,13 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
if err != nil {
return nil, err
}
tcpc := c.(*net.TCPConn)
kac := c.(keepAliveConn)
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
// default on linux: 30 + 8 * 30
// default on osx: 30 + 8 * 75
tcpc.SetKeepAlive(true)
tcpc.SetKeepAlivePeriod(30 * time.Second)
return tcpc, nil
kac.SetKeepAlive(true)
kac.SetKeepAlivePeriod(30 * time.Second)
return c, nil
}
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
@ -75,12 +77,12 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
if err != nil {
return
}
tcpc := c.(*net.TCPConn)
kac := c.(keepAliveConn)
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
// default on linux: 30 + 8 * 30
// default on osx: 30 + 8 * 75
tcpc.SetKeepAlive(true)
tcpc.SetKeepAlivePeriod(30 * time.Second)
kac.SetKeepAlive(true)
kac.SetKeepAlivePeriod(30 * time.Second)
c = tls.Server(c, l.config)
return
}

View File

@ -16,6 +16,7 @@ package transport
import (
"crypto/tls"
"net"
"net/http"
"os"
"testing"
@ -25,7 +26,12 @@ import (
// that accepts connections.
// TODO: verify the keepalive option is set correctly
func TestNewKeepAliveListener(t *testing.T) {
ln, err := NewKeepAliveListener("127.0.0.1:0", "http", TLSInfo{})
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected listen error: %v", err)
}
ln, err = NewKeepAliveListener(ln, "http", TLSInfo{})
if err != nil {
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
}
@ -38,6 +44,7 @@ func TestNewKeepAliveListener(t *testing.T) {
conn.Close()
ln.Close()
ln, err = net.Listen("tcp", "127.0.0.1:0")
// tls
tmp, err := createTempFile([]byte("XXX"))
if err != nil {
@ -46,7 +53,7 @@ func TestNewKeepAliveListener(t *testing.T) {
defer os.Remove(tmp)
tlsInfo := TLSInfo{CertFile: tmp, KeyFile: tmp}
tlsInfo.parseFunc = fakeCertificateParserFunc(tls.Certificate{}, nil)
tlsln, err := NewKeepAliveListener("127.0.0.1:0", "https", tlsInfo)
tlsln, err := NewKeepAliveListener(ln, "https", tlsInfo)
if err != nil {
t.Fatalf("unexpected NewKeepAliveListener error: %v", err)
}
@ -64,7 +71,12 @@ func TestNewKeepAliveListener(t *testing.T) {
}
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected listen error: %v", err)
}
_, err = NewKeepAliveListener(ln, "https", TLSInfo{})
if err == nil {
t.Errorf("err = nil, want not presented error")
}

View File

@ -4,11 +4,17 @@
// Package netutil provides network utility functions, complementing the more
// common ones in the net package.
package netutil
package transport
import (
"errors"
"net"
"sync"
"time"
)
var (
ErrNotTCP = errors.New("only tcp connections have keepalive")
)
// LimitListener returns a Listener that accepts at most n simultaneous
@ -46,3 +52,19 @@ func (l *limitListenerConn) Close() error {
l.releaseOnce.Do(l.release)
return err
}
func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
tcpc, ok := l.Conn.(*net.TCPConn)
if !ok {
return ErrNotTCP
}
return tcpc.SetKeepAlive(doKeepAlive)
}
func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
tcpc, ok := l.Conn.(*net.TCPConn)
if !ok {
return ErrNotTCP
}
return tcpc.SetKeepAlivePeriod(d)
}

View File

@ -78,8 +78,9 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
defer wh.mutex.Unlock()
// If the event exists in the known history, append the EtcdIndex and return immediately
if event != nil {
event.EtcdIndex = storeIndex
w.eventChan <- event
ne := event.Clone()
ne.EtcdIndex = storeIndex
w.eventChan <- ne
return w, nil
}

View File

@ -27,7 +27,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.1.0"
Version = "2.2.2"
Version = "2.2.4"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"