Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
faeeb2fc75 | |||
d50c487132 | |||
b837feffe4 | |||
4d89640195 | |||
1292d453c3 | |||
ec20b381ed | |||
37cc3f5262 | |||
7f1940e5ed | |||
caccf8e5e6 | |||
ef65dfe2eb | |||
ff6c6916f2 | |||
3dfe8765d3 | |||
a4a52cb15d | |||
014970930a | |||
4628be982c | |||
ff55e5a188 | |||
bf0898266c | |||
b9d69f7698 |
@ -4,7 +4,6 @@ go_import_path: github.com/coreos/etcd
|
||||
sudo: false
|
||||
|
||||
go:
|
||||
- 1.5
|
||||
- 1.6
|
||||
- tip
|
||||
|
||||
@ -22,10 +21,6 @@ matrix:
|
||||
allow_failures:
|
||||
- go: tip
|
||||
exclude:
|
||||
- go: 1.5
|
||||
env: TARGET=arm
|
||||
- go: 1.5
|
||||
env: TARGET=ppc64le
|
||||
- go: 1.6
|
||||
env: TARGET=arm64
|
||||
- go: tip
|
||||
|
@ -1,10 +1,10 @@
|
||||
FROM alpine:latest
|
||||
|
||||
ADD bin/etcd /usr/local/bin/
|
||||
ADD bin/etcdctl /usr/local/bin/
|
||||
ADD etcd /usr/local/bin/
|
||||
ADD etcdctl /usr/local/bin/
|
||||
RUN mkdir -p /var/etcd/
|
||||
|
||||
EXPOSE 2379 2380
|
||||
|
||||
# Define default command.
|
||||
CMD ["/usr/local/bin/etcd"]
|
||||
# Define default entrypoint.
|
||||
ENTRYPOINT ["/usr/local/bin/etcd"]
|
||||
|
@ -25,7 +25,7 @@ curl -L http://localhost:2379/v3alpha/kv/range \
|
||||
|
||||
## Swagger
|
||||
|
||||
Generated [Swapper][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
|
||||
Generated [Swagger][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
|
||||
|
||||
[api-ref]: ./api_reference_v3.md
|
||||
[go-client]: https://github.com/coreos/etcd/tree/master/clientv3
|
||||
|
@ -11,7 +11,7 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
|
||||
## Build the latest version
|
||||
|
||||
For those wanting to try the very latest version, build etcd from the `master` branch.
|
||||
[Go](https://golang.org/) version 1.5+ is required to build the latest version of etcd.
|
||||
[Go](https://golang.org/) version 1.6+ (with HTTP2 support) is required to build the latest version of etcd.
|
||||
|
||||
Here are the commands to build an etcd binary from the `master` branch:
|
||||
|
||||
|
@ -40,7 +40,7 @@ See [etcdctl][etcdctl] for a simple command line client.
|
||||
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
|
||||
|
||||
For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch.
|
||||
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.5+ is required).
|
||||
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
|
||||
All development occurs on `master`, including new features and bug fixes.
|
||||
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
|
||||
|
||||
|
@ -37,6 +37,10 @@ var (
|
||||
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
|
||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||
|
||||
// oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
|
||||
// that Do() will not retry a request
|
||||
oneShotCtxValue interface{}
|
||||
)
|
||||
|
||||
var DefaultRequestTimeout = 5 * time.Second
|
||||
@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||
var body []byte
|
||||
var err error
|
||||
cerr := &ClusterError{}
|
||||
isOneShot := ctx.Value(&oneShotCtxValue) != nil
|
||||
|
||||
for i := pinned; i < leps+pinned; i++ {
|
||||
k := i % leps
|
||||
@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||
return nil, nil, err
|
||||
}
|
||||
if isOneShot {
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode/100 == 5 {
|
||||
@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||
default:
|
||||
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
|
||||
}
|
||||
if isOneShot {
|
||||
return nil, nil, cerr.Errors[0]
|
||||
}
|
||||
continue
|
||||
}
|
||||
if k != pinned {
|
||||
|
134
client/integration/client_test.go
Normal file
134
client/integration/client_test.go
Normal file
@ -0,0 +1,134 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection.
|
||||
func TestV2NoRetryEOF(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
// generate an EOF response; specify address so appears first in sorted ep list
|
||||
lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid()))
|
||||
defer lEOF.Close()
|
||||
tries := uint32(0)
|
||||
go func() {
|
||||
for {
|
||||
conn, err := lEOF.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
atomic.AddUint32(&tries, 1)
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
eofURL := integration.UrlScheme + "://" + lEOF.Addr().String()
|
||||
cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil)
|
||||
kapi := client.NewKeysAPI(cli)
|
||||
for i, f := range noRetryList(kapi) {
|
||||
startTries := atomic.LoadUint32(&tries)
|
||||
if err := f(); err == nil {
|
||||
t.Errorf("#%d: expected EOF error, got nil", i)
|
||||
}
|
||||
endTries := atomic.LoadUint32(&tries)
|
||||
if startTries+1 != endTries {
|
||||
t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code.
|
||||
func TestV2NoRetryNoLeader(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid()))
|
||||
eh := &errHandler{errCode: http.StatusServiceUnavailable}
|
||||
srv := httptest.NewUnstartedServer(eh)
|
||||
defer lHttp.Close()
|
||||
defer srv.Close()
|
||||
srv.Listener = lHttp
|
||||
go srv.Start()
|
||||
lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String()
|
||||
|
||||
cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil)
|
||||
kapi := client.NewKeysAPI(cli)
|
||||
// test error code
|
||||
for i, f := range noRetryList(kapi) {
|
||||
reqs := eh.reqs
|
||||
if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") {
|
||||
t.Errorf("#%d: expected \"no leader\", got %v", i, err)
|
||||
}
|
||||
if eh.reqs != reqs+1 {
|
||||
t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
|
||||
func TestV2RetryRefuse(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
cl := integration.NewCluster(t, 1)
|
||||
cl.Launch(t)
|
||||
defer cl.Terminate(t)
|
||||
// test connection refused; expect no error failover
|
||||
cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil)
|
||||
kapi := client.NewKeysAPI(cli)
|
||||
if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i, f := range noRetryList(kapi) {
|
||||
if err := f(); err != nil {
|
||||
t.Errorf("#%d: unexpected retry failure (%v)", i, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type errHandler struct {
|
||||
errCode int
|
||||
reqs int
|
||||
}
|
||||
|
||||
func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
req.Body.Close()
|
||||
eh.reqs++
|
||||
w.WriteHeader(eh.errCode)
|
||||
}
|
||||
|
||||
func noRetryList(kapi client.KeysAPI) []func() error {
|
||||
return []func() error{
|
||||
func() error {
|
||||
opts := &client.SetOptions{PrevExist: client.PrevNoExist}
|
||||
_, err := kapi.Set(context.Background(), "/setkey", "bar", opts)
|
||||
return err
|
||||
},
|
||||
func() error {
|
||||
_, err := kapi.Delete(context.Background(), "/delkey", nil)
|
||||
return err
|
||||
},
|
||||
}
|
||||
}
|
20
client/integration/main_test.go
Normal file
20
client/integration/main_test.go
Normal file
@ -0,0 +1,20 @@
|
||||
// Copyright 2013 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
v := m.Run()
|
||||
if v == 0 && testutil.CheckLeakedGoroutine() {
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(v)
|
||||
}
|
@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
|
||||
act.Dir = opts.Dir
|
||||
}
|
||||
|
||||
resp, body, err := k.client.Do(ctx, act)
|
||||
doCtx := ctx
|
||||
if act.PrevExist == PrevNoExist {
|
||||
doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
|
||||
}
|
||||
resp, body, err := k.client.Do(doCtx, act)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption
|
||||
act.Recursive = opts.Recursive
|
||||
}
|
||||
|
||||
resp, body, err := k.client.Do(ctx, act)
|
||||
doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
|
||||
resp, body, err := k.client.Do(doCtx, act)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -15,7 +15,9 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
|
||||
t.Fatal("failed to receive update in one second")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMirrorSyncBase(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(nil)
|
||||
|
||||
cli := cluster.Client(0)
|
||||
ctx := context.TODO()
|
||||
|
||||
keyCh := make(chan string)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < 50; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for key := range keyCh {
|
||||
if _, err := cli.Put(ctx, key, "test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < 2000; i++ {
|
||||
keyCh <- fmt.Sprintf("test%d", i)
|
||||
}
|
||||
|
||||
close(keyCh)
|
||||
wg.Wait()
|
||||
|
||||
syncer := mirror.NewSyncer(cli, "test", 0)
|
||||
respCh, errCh := syncer.SyncBase(ctx)
|
||||
|
||||
count := 0
|
||||
|
||||
for resp := range respCh {
|
||||
count = count + len(resp.Kvs)
|
||||
if !resp.More {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for err := range errCh {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
if count != 2000 {
|
||||
t.Errorf("unexpected kv count: %d", count)
|
||||
}
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
||||
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
|
||||
// We then range from the prefix to the next prefix if exists. Or we will
|
||||
// range from the prefix to the end if the next prefix does not exists.
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
|
||||
key = s.prefix
|
||||
}
|
||||
|
||||
|
@ -182,6 +182,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
|
||||
}
|
||||
}
|
||||
|
||||
// GetPrefixRangeEnd gets the range end of the prefix.
|
||||
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
|
||||
func GetPrefixRangeEnd(prefix string) string {
|
||||
return string(getPrefix([]byte(prefix)))
|
||||
}
|
||||
|
||||
func getPrefix(key []byte) []byte {
|
||||
end := make([]byte, len(key))
|
||||
copy(end, key)
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
@ -71,7 +72,7 @@ func runCapabilityLoop(s *etcdserver.EtcdServer) {
|
||||
enableMapMu.Lock()
|
||||
enabledMap = capabilityMaps[pv.String()]
|
||||
enableMapMu.Unlock()
|
||||
plog.Infof("enabled capabilities for version %s", pv)
|
||||
plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,11 +86,11 @@ type serverWatchStream struct {
|
||||
watchStream mvcc.WatchStream
|
||||
ctrlStream chan *pb.WatchResponse
|
||||
|
||||
// mu protects progress, prevKV
|
||||
mu sync.Mutex
|
||||
// progress tracks the watchID that stream might need to send
|
||||
// progress to.
|
||||
progress map[mvcc.WatchID]bool
|
||||
// mu protects progress
|
||||
mu sync.Mutex
|
||||
|
||||
// closec indicates the stream is closed.
|
||||
closec chan struct{}
|
||||
@ -171,7 +171,9 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
}
|
||||
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||
if id != -1 && creq.ProgressNotify {
|
||||
sws.mu.Lock()
|
||||
sws.progress[id] = true
|
||||
sws.mu.Unlock()
|
||||
}
|
||||
wr := &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(wsrev),
|
||||
@ -199,9 +201,11 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
sws.mu.Unlock()
|
||||
}
|
||||
}
|
||||
// TODO: do we need to return error back to client?
|
||||
default:
|
||||
panic("not implemented")
|
||||
// we probably should not shutdown the entire stream when
|
||||
// receive an valid command.
|
||||
// so just do nothing instead.
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -296,12 +300,14 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
delete(pending, wid)
|
||||
}
|
||||
case <-progressTicker.C:
|
||||
sws.mu.Lock()
|
||||
for id, ok := range sws.progress {
|
||||
if ok {
|
||||
sws.watchStream.RequestProgress(id)
|
||||
}
|
||||
sws.progress[id] = true
|
||||
}
|
||||
sws.mu.Unlock()
|
||||
case <-sws.closec:
|
||||
return
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(be)
|
||||
cl.Recover()
|
||||
if cl.Version() != nil && cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
os.RemoveAll(bepath)
|
||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||
}
|
||||
@ -1170,8 +1170,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
}
|
||||
plog.Panicf("unexpected create snapshot error %v", err)
|
||||
}
|
||||
// commit v3 storage because WAL file before snapshot index
|
||||
// could be removed after SaveSnap.
|
||||
// commit kv to write metadata (for example: consistent index) to disk.
|
||||
s.KV().Commit()
|
||||
// SaveSnap saves the snapshot and releases the locked wal files
|
||||
// to the snapshot index.
|
||||
|
@ -39,6 +39,8 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
|
||||
plog.Panicf("store save should never fail: %v", err)
|
||||
}
|
||||
|
||||
// commit kv to write metadata(for example: consistent index).
|
||||
s.KV().Commit()
|
||||
dbsnap := s.be.Snapshot()
|
||||
// get a snapshot of v3 KV as readCloser
|
||||
rc := newSnapshotReaderCloser(dbsnap)
|
||||
|
@ -54,8 +54,8 @@ const (
|
||||
requestTimeout = 20 * time.Second
|
||||
|
||||
basePort = 21000
|
||||
urlScheme = "unix"
|
||||
urlSchemeTLS = "unixs"
|
||||
UrlScheme = "unix"
|
||||
UrlSchemeTLS = "unixs"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -96,9 +96,9 @@ func init() {
|
||||
|
||||
func schemeFromTLSInfo(tls *transport.TLSInfo) string {
|
||||
if tls == nil {
|
||||
return urlScheme
|
||||
return UrlScheme
|
||||
}
|
||||
return urlSchemeTLS
|
||||
return UrlSchemeTLS
|
||||
}
|
||||
|
||||
func (c *cluster) fillClusterForMembers() error {
|
||||
@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) {
|
||||
}
|
||||
|
||||
func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
|
||||
cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
||||
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
||||
ma := client.NewMembersAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := ma.Add(ctx, peerURL); err != nil {
|
||||
@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) {
|
||||
|
||||
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
||||
// send remove request to the cluster
|
||||
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
||||
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
||||
ma := client.NewMembersAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
||||
@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) {
|
||||
|
||||
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
||||
for _, u := range c.URLs() {
|
||||
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
||||
cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
||||
ma := client.NewMembersAPI(cc)
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
|
||||
func newLocalListener(t *testing.T) net.Listener {
|
||||
c := atomic.AddInt64(&localListenCount, 1)
|
||||
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
|
||||
return newListenerWithAddr(t, addr)
|
||||
return NewListenerWithAddr(t, addr)
|
||||
}
|
||||
|
||||
func newListenerWithAddr(t *testing.T, addr string) net.Listener {
|
||||
func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
|
||||
l, err := transport.NewUnixListener(addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -614,7 +614,7 @@ func (m *member) Launch() error {
|
||||
}
|
||||
|
||||
func (m *member) WaitOK(t *testing.T) {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
||||
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error {
|
||||
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
|
||||
newPeerListeners := make([]net.Listener, 0)
|
||||
for _, ln := range m.PeerListeners {
|
||||
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
|
||||
newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
|
||||
}
|
||||
m.PeerListeners = newPeerListeners
|
||||
newClientListeners := make([]net.Listener, 0)
|
||||
for _, ln := range m.ClientListeners {
|
||||
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
|
||||
newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
|
||||
}
|
||||
m.ClientListeners = newClientListeners
|
||||
|
||||
@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) {
|
||||
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
|
||||
}
|
||||
|
||||
func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
||||
func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
||||
cfgtls := transport.TLSInfo{}
|
||||
if tls != nil {
|
||||
cfgtls = *tls
|
||||
|
@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
||||
dc.Launch(t)
|
||||
defer dc.Terminate(t)
|
||||
// init discovery token space
|
||||
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
||||
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
|
||||
dkapi := client.NewKeysAPI(dcc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
|
||||
@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
|
||||
dc.Launch(t)
|
||||
defer dc.Terminate(t)
|
||||
// init discovery token space
|
||||
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
||||
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
|
||||
dkapi := client.NewKeysAPI(dcc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
|
||||
@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
|
||||
func TestForceNewCluster(t *testing.T) {
|
||||
c := NewCluster(t, 3)
|
||||
c.Launch(t)
|
||||
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
resp, err := kapi.Create(ctx, "/foo", "bar")
|
||||
@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) {
|
||||
c.waitLeader(t, c.Members[:1])
|
||||
|
||||
// use new http client to init new connection
|
||||
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||
cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||
kapi = client.NewKeysAPI(cc)
|
||||
// ensure force restart keep the old data, and new cluster can make progress
|
||||
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
||||
@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) {
|
||||
c.Members[1].Stop(t)
|
||||
|
||||
// send remove member-1 request to the cluster.
|
||||
cc := mustNewHTTPClient(t, c.URLs(), nil)
|
||||
cc := MustNewHTTPClient(t, c.URLs(), nil)
|
||||
ma := client.NewMembersAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
// the proposal is not committed because member 1 is stopped, but the
|
||||
@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) {
|
||||
c.waitLeader(t, c.Members)
|
||||
|
||||
// try to participate in cluster
|
||||
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
|
||||
cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
|
||||
@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) {
|
||||
// a random key first, and check the new key could be got from all client urls
|
||||
// of the cluster.
|
||||
func clusterMustProgress(t *testing.T, membs []*member) {
|
||||
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
|
||||
cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", rand.Int())
|
||||
@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
|
||||
|
||||
for i, m := range membs {
|
||||
u := m.URL()
|
||||
mcc := mustNewHTTPClient(t, []string{u}, nil)
|
||||
mcc := MustNewHTTPClient(t, []string{u}, nil)
|
||||
mkapi := client.NewKeysAPI(mcc)
|
||||
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
|
||||
|
@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
||||
resps := make([]*client.Response, 120)
|
||||
var err error
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
||||
|
||||
m.WaitOK(t)
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
|
1
test
1
test
@ -64,6 +64,7 @@ function integration_tests {
|
||||
intpid="$!"
|
||||
wait $e2epid
|
||||
wait $intpid
|
||||
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
|
||||
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
|
||||
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.3.0"
|
||||
Version = "3.0.0"
|
||||
Version = "3.0.2"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
19
wal/wal.go
19
wal/wal.go
@ -129,15 +129,22 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(dirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// rename of directory with locked files doesn't work on windows; close
|
||||
// the WAL to release the locks so the directory can be renamed
|
||||
w.Close()
|
||||
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
||||
return w, nil
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
}
|
||||
return newWAL, nil
|
||||
}
|
||||
|
||||
// Open opens the WAL at the given snap.
|
||||
|
Reference in New Issue
Block a user