Compare commits

..

18 Commits

Author SHA1 Message Date
faeeb2fc75 version: bump to v3.0.2 2016-07-08 11:45:18 -07:00
d50c487132 v3rpc: lock progress and prevKV map correctly 2016-07-08 10:16:10 -07:00
b837feffe4 client/integration: test v2 client one shot operations 2016-07-07 17:30:09 -07:00
4d89640195 client: make set/delete one shot operations
Old behavior would retry set and delete even if there's an error. This
can lead to the client returning an error for deleting twice, instead
of returning an error for an interdeterminate state.

Fixes #5832
2016-07-07 17:30:04 -07:00
1292d453c3 clientv3: fix sync base
It is not correct to use WithPrefix. Range end will change in every
internal batch.
2016-07-07 14:21:43 -07:00
ec20b381ed clientv3: add public function to get prefix range end 2016-07-07 14:21:41 -07:00
37cc3f5262 Dockerfile: use 'ENTRYPOINT' instead of 'CMD'
use entrypoint, so people can specify flags to etcd
without providing the binary.

Signed-off-by: Secret <haichuang221@163.com>
2016-07-05 11:40:47 -07:00
7f1940e5ed etcdserver: commit before sending snapshot 2016-07-05 11:06:54 -07:00
caccf8e5e6 v3rpc: do not panic on user error for watch 2016-07-05 11:06:35 -07:00
ef65dfe2eb wal: release wal locks before renaming directory on init
Fixes #5852
2016-07-05 11:05:51 -07:00
ff6c6916f2 etcdserver/api: print only major.minor version API
Before

2016-07-01 14:57:50.927170 I | api: enabled capabilities for version 3.0.0

After

2016-07-01 14:57:50.927170 I | api: enabled capabilities for version 3.0
2016-07-01 15:19:53 -07:00
3dfe8765d3 version: bump to v3.0.1+git 2016-07-01 14:53:20 -07:00
a4a52cb15d version: bump to v3.0.1 2016-07-01 13:58:37 -07:00
014970930a *: test, docs with go1.6+
etcd v3 uses http/2, which doesn't work well with go1.5
2016-07-01 11:59:37 -07:00
4628be982c Documentation: fix typo in api_grpc_gateway.md 2016-07-01 11:59:35 -07:00
ff55e5a188 etcdserver: exit on missing backend only if semver is >= 3.0.0 2016-07-01 11:59:32 -07:00
bf0898266c release: fix Dockerfile etcd binary paths
release script uses binary files in 'release/image-docker',
not the ones in "bin/". Tested with v3.0.0 release.
2016-06-30 12:27:34 -07:00
b9d69f7698 version: bump to v3.0.0+git 2016-06-30 11:37:05 -07:00
22 changed files with 294 additions and 53 deletions

View File

@ -4,7 +4,6 @@ go_import_path: github.com/coreos/etcd
sudo: false sudo: false
go: go:
- 1.5
- 1.6 - 1.6
- tip - tip
@ -22,10 +21,6 @@ matrix:
allow_failures: allow_failures:
- go: tip - go: tip
exclude: exclude:
- go: 1.5
env: TARGET=arm
- go: 1.5
env: TARGET=ppc64le
- go: 1.6 - go: 1.6
env: TARGET=arm64 env: TARGET=arm64
- go: tip - go: tip

View File

@ -1,10 +1,10 @@
FROM alpine:latest FROM alpine:latest
ADD bin/etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD bin/etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/
RUN mkdir -p /var/etcd/ RUN mkdir -p /var/etcd/
EXPOSE 2379 2380 EXPOSE 2379 2380
# Define default command. # Define default entrypoint.
CMD ["/usr/local/bin/etcd"] ENTRYPOINT ["/usr/local/bin/etcd"]

View File

@ -25,7 +25,7 @@ curl -L http://localhost:2379/v3alpha/kv/range \
## Swagger ## Swagger
Generated [Swapper][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc]. Generated [Swagger][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
[api-ref]: ./api_reference_v3.md [api-ref]: ./api_reference_v3.md
[go-client]: https://github.com/coreos/etcd/tree/master/clientv3 [go-client]: https://github.com/coreos/etcd/tree/master/clientv3

View File

@ -11,7 +11,7 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
## Build the latest version ## Build the latest version
For those wanting to try the very latest version, build etcd from the `master` branch. For those wanting to try the very latest version, build etcd from the `master` branch.
[Go](https://golang.org/) version 1.5+ is required to build the latest version of etcd. [Go](https://golang.org/) version 1.6+ (with HTTP2 support) is required to build the latest version of etcd.
Here are the commands to build an etcd binary from the `master` branch: Here are the commands to build an etcd binary from the `master` branch:

View File

@ -40,7 +40,7 @@ See [etcdctl][etcdctl] for a simple command line client.
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release]. The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch. For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch.
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.5+ is required). You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
All development occurs on `master`, including new features and bug fixes. All development occurs on `master`, including new features and bug fixes.
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide. Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.

View File

@ -37,6 +37,10 @@ var (
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
errTooManyRedirectChecks = errors.New("client: too many redirect checks") errTooManyRedirectChecks = errors.New("client: too many redirect checks")
// oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
// that Do() will not retry a request
oneShotCtxValue interface{}
) )
var DefaultRequestTimeout = 5 * time.Second var DefaultRequestTimeout = 5 * time.Second
@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
var body []byte var body []byte
var err error var err error
cerr := &ClusterError{} cerr := &ClusterError{}
isOneShot := ctx.Value(&oneShotCtxValue) != nil
for i := pinned; i < leps+pinned; i++ { for i := pinned; i < leps+pinned; i++ {
k := i % leps k := i % leps
@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
if err == context.Canceled || err == context.DeadlineExceeded { if err == context.Canceled || err == context.DeadlineExceeded {
return nil, nil, err return nil, nil, err
} }
if isOneShot {
return nil, nil, err
}
continue continue
} }
if resp.StatusCode/100 == 5 { if resp.StatusCode/100 == 5 {
@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
default: default:
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode))) cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
} }
if isOneShot {
return nil, nil, cerr.Errors[0]
}
continue continue
} }
if k != pinned { if k != pinned {

View File

@ -0,0 +1,134 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"
"golang.org/x/net/context"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)
// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection.
func TestV2NoRetryEOF(t *testing.T) {
defer testutil.AfterTest(t)
// generate an EOF response; specify address so appears first in sorted ep list
lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid()))
defer lEOF.Close()
tries := uint32(0)
go func() {
for {
conn, err := lEOF.Accept()
if err != nil {
return
}
atomic.AddUint32(&tries, 1)
conn.Close()
}
}()
eofURL := integration.UrlScheme + "://" + lEOF.Addr().String()
cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil)
kapi := client.NewKeysAPI(cli)
for i, f := range noRetryList(kapi) {
startTries := atomic.LoadUint32(&tries)
if err := f(); err == nil {
t.Errorf("#%d: expected EOF error, got nil", i)
}
endTries := atomic.LoadUint32(&tries)
if startTries+1 != endTries {
t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries)
}
}
}
// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code.
func TestV2NoRetryNoLeader(t *testing.T) {
defer testutil.AfterTest(t)
lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid()))
eh := &errHandler{errCode: http.StatusServiceUnavailable}
srv := httptest.NewUnstartedServer(eh)
defer lHttp.Close()
defer srv.Close()
srv.Listener = lHttp
go srv.Start()
lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String()
cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil)
kapi := client.NewKeysAPI(cli)
// test error code
for i, f := range noRetryList(kapi) {
reqs := eh.reqs
if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") {
t.Errorf("#%d: expected \"no leader\", got %v", i, err)
}
if eh.reqs != reqs+1 {
t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs)
}
}
}
// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
func TestV2RetryRefuse(t *testing.T) {
defer testutil.AfterTest(t)
cl := integration.NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
// test connection refused; expect no error failover
cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil)
kapi := client.NewKeysAPI(cli)
if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil {
t.Fatal(err)
}
for i, f := range noRetryList(kapi) {
if err := f(); err != nil {
t.Errorf("#%d: unexpected retry failure (%v)", i, err)
}
}
}
type errHandler struct {
errCode int
reqs int
}
func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req.Body.Close()
eh.reqs++
w.WriteHeader(eh.errCode)
}
func noRetryList(kapi client.KeysAPI) []func() error {
return []func() error{
func() error {
opts := &client.SetOptions{PrevExist: client.PrevNoExist}
_, err := kapi.Set(context.Background(), "/setkey", "bar", opts)
return err
},
func() error {
_, err := kapi.Delete(context.Background(), "/delkey", nil)
return err
},
}
}

View File

@ -0,0 +1,20 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package integration
import (
"os"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

View File

@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
act.Dir = opts.Dir act.Dir = opts.Dir
} }
resp, body, err := k.client.Do(ctx, act) doCtx := ctx
if act.PrevExist == PrevNoExist {
doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
}
resp, body, err := k.client.Do(doCtx, act)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption
act.Recursive = opts.Recursive act.Recursive = opts.Recursive
} }
resp, body, err := k.client.Do(ctx, act) doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
resp, body, err := k.client.Do(doCtx, act)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -15,7 +15,9 @@
package integration package integration
import ( import (
"fmt"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
t.Fatal("failed to receive update in one second") t.Fatal("failed to receive update in one second")
} }
} }
func TestMirrorSyncBase(t *testing.T) {
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(nil)
cli := cluster.Client(0)
ctx := context.TODO()
keyCh := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for key := range keyCh {
if _, err := cli.Put(ctx, key, "test"); err != nil {
t.Fatal(err)
}
}
}()
}
for i := 0; i < 2000; i++ {
keyCh <- fmt.Sprintf("test%d", i)
}
close(keyCh)
wg.Wait()
syncer := mirror.NewSyncer(cli, "test", 0)
respCh, errCh := syncer.SyncBase(ctx)
count := 0
for resp := range respCh {
count = count + len(resp.Kvs)
if !resp.More {
break
}
}
for err := range errCh {
t.Fatalf("unexpected error %v", err)
}
if count != 2000 {
t.Errorf("unexpected kv count: %d", count)
}
}

View File

@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
// If len(s.prefix) != 0, we will sync key-value space with given prefix. // If len(s.prefix) != 0, we will sync key-value space with given prefix.
// We then range from the prefix to the next prefix if exists. Or we will // We then range from the prefix to the next prefix if exists. Or we will
// range from the prefix to the end if the next prefix does not exists. // range from the prefix to the end if the next prefix does not exists.
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
key = s.prefix key = s.prefix
} }

View File

@ -182,6 +182,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
} }
} }
// GetPrefixRangeEnd gets the range end of the prefix.
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
func GetPrefixRangeEnd(prefix string) string {
return string(getPrefix([]byte(prefix)))
}
func getPrefix(key []byte) []byte { func getPrefix(key []byte) []byte {
end := make([]byte, len(key)) end := make([]byte, len(key))
copy(end, key) copy(end, key)

View File

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
) )
@ -71,7 +72,7 @@ func runCapabilityLoop(s *etcdserver.EtcdServer) {
enableMapMu.Lock() enableMapMu.Lock()
enabledMap = capabilityMaps[pv.String()] enabledMap = capabilityMaps[pv.String()]
enableMapMu.Unlock() enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", pv) plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
} }
} }

View File

@ -86,11 +86,11 @@ type serverWatchStream struct {
watchStream mvcc.WatchStream watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse ctrlStream chan *pb.WatchResponse
// mu protects progress, prevKV
mu sync.Mutex
// progress tracks the watchID that stream might need to send // progress tracks the watchID that stream might need to send
// progress to. // progress to.
progress map[mvcc.WatchID]bool progress map[mvcc.WatchID]bool
// mu protects progress
mu sync.Mutex
// closec indicates the stream is closed. // closec indicates the stream is closed.
closec chan struct{} closec chan struct{}
@ -171,7 +171,9 @@ func (sws *serverWatchStream) recvLoop() error {
} }
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify { if id != -1 && creq.ProgressNotify {
sws.mu.Lock()
sws.progress[id] = true sws.progress[id] = true
sws.mu.Unlock()
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev), Header: sws.newResponseHeader(wsrev),
@ -199,9 +201,11 @@ func (sws *serverWatchStream) recvLoop() error {
sws.mu.Unlock() sws.mu.Unlock()
} }
} }
// TODO: do we need to return error back to client?
default: default:
panic("not implemented") // we probably should not shutdown the entire stream when
// receive an valid command.
// so just do nothing instead.
continue
} }
} }
} }
@ -296,12 +300,14 @@ func (sws *serverWatchStream) sendLoop() {
delete(pending, wid) delete(pending, wid)
} }
case <-progressTicker.C: case <-progressTicker.C:
sws.mu.Lock()
for id, ok := range sws.progress { for id, ok := range sws.progress {
if ok { if ok {
sws.watchStream.RequestProgress(id) sws.watchStream.RequestProgress(id)
} }
sws.progress[id] = true sws.progress[id] = true
} }
sws.mu.Unlock()
case <-sws.closec: case <-sws.closec:
return return
} }

View File

@ -357,7 +357,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(be) cl.SetBackend(be)
cl.Recover() cl.Recover()
if cl.Version() != nil && cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
} }
@ -1170,8 +1170,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
} }
plog.Panicf("unexpected create snapshot error %v", err) plog.Panicf("unexpected create snapshot error %v", err)
} }
// commit v3 storage because WAL file before snapshot index // commit kv to write metadata (for example: consistent index) to disk.
// could be removed after SaveSnap.
s.KV().Commit() s.KV().Commit()
// SaveSnap saves the snapshot and releases the locked wal files // SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index. // to the snapshot index.

View File

@ -39,6 +39,8 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
plog.Panicf("store save should never fail: %v", err) plog.Panicf("store save should never fail: %v", err)
} }
// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
dbsnap := s.be.Snapshot() dbsnap := s.be.Snapshot()
// get a snapshot of v3 KV as readCloser // get a snapshot of v3 KV as readCloser
rc := newSnapshotReaderCloser(dbsnap) rc := newSnapshotReaderCloser(dbsnap)

View File

@ -54,8 +54,8 @@ const (
requestTimeout = 20 * time.Second requestTimeout = 20 * time.Second
basePort = 21000 basePort = 21000
urlScheme = "unix" UrlScheme = "unix"
urlSchemeTLS = "unixs" UrlSchemeTLS = "unixs"
) )
var ( var (
@ -96,9 +96,9 @@ func init() {
func schemeFromTLSInfo(tls *transport.TLSInfo) string { func schemeFromTLSInfo(tls *transport.TLSInfo) string {
if tls == nil { if tls == nil {
return urlScheme return UrlScheme
} }
return urlSchemeTLS return UrlSchemeTLS
} }
func (c *cluster) fillClusterForMembers() error { func (c *cluster) fillClusterForMembers() error {
@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) {
} }
func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := ma.Add(ctx, peerURL); err != nil { if _, err := ma.Add(ctx, peerURL); err != nil {
@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) {
func (c *cluster) RemoveMember(t *testing.T, id uint64) { func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster // send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil { if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) {
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() { for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
func newLocalListener(t *testing.T) net.Listener { func newLocalListener(t *testing.T) net.Listener {
c := atomic.AddInt64(&localListenCount, 1) c := atomic.AddInt64(&localListenCount, 1)
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid()) addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
return newListenerWithAddr(t, addr) return NewListenerWithAddr(t, addr)
} }
func newListenerWithAddr(t *testing.T, addr string) net.Listener { func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
l, err := transport.NewUnixListener(addr) l, err := transport.NewUnixListener(addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -614,7 +614,7 @@ func (m *member) Launch() error {
} }
func (m *member) WaitOK(t *testing.T) { func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error {
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr) plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
newPeerListeners := make([]net.Listener, 0) newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners { for _, ln := range m.PeerListeners {
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
} }
m.PeerListeners = newPeerListeners m.PeerListeners = newPeerListeners
newClientListeners := make([]net.Listener, 0) newClientListeners := make([]net.Listener, 0)
for _, ln := range m.ClientListeners { for _, ln := range m.ClientListeners {
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
} }
m.ClientListeners = newClientListeners m.ClientListeners = newClientListeners
@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) {
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
} }
func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
cfgtls := transport.TLSInfo{} cfgtls := transport.TLSInfo{}
if tls != nil { if tls != nil {
cfgtls = *tls cfgtls = *tls

View File

@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil) dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil) dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
func TestForceNewCluster(t *testing.T) { func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3) c := NewCluster(t, 3)
c.Launch(t) c.Launch(t)
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kapi.Create(ctx, "/foo", "bar") resp, err := kapi.Create(ctx, "/foo", "bar")
@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) {
c.waitLeader(t, c.Members[:1]) c.waitLeader(t, c.Members[:1])
// use new http client to init new connection // use new http client to init new connection
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi = client.NewKeysAPI(cc) kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress // ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) {
c.Members[1].Stop(t) c.Members[1].Stop(t)
// send remove member-1 request to the cluster. // send remove member-1 request to the cluster.
cc := mustNewHTTPClient(t, c.URLs(), nil) cc := MustNewHTTPClient(t, c.URLs(), nil)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
// the proposal is not committed because member 1 is stopped, but the // the proposal is not committed because member 1 is stopped, but the
@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) {
c.waitLeader(t, c.Members) c.waitLeader(t, c.Members)
// try to participate in cluster // try to participate in cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil { if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) {
// a random key first, and check the new key could be got from all client urls // a random key first, and check the new key could be got from all client urls
// of the cluster. // of the cluster.
func clusterMustProgress(t *testing.T, membs []*member) { func clusterMustProgress(t *testing.T, membs []*member) {
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", rand.Int()) key := fmt.Sprintf("foo%d", rand.Int())
@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
for i, m := range membs { for i, m := range membs {
u := m.URL() u := m.URL()
mcc := mustNewHTTPClient(t, []string{u}, nil) mcc := MustNewHTTPClient(t, []string{u}, nil)
mkapi := client.NewKeysAPI(mcc) mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {

View File

@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
resps := make([]*client.Response, 120) resps := make([]*client.Response, 120)
var err error var err error
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil) cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)
@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
m.WaitOK(t) m.WaitOK(t)
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil) cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)

1
test
View File

@ -64,6 +64,7 @@ function integration_tests {
intpid="$!" intpid="$!"
wait $e2epid wait $e2epid
wait $intpid wait $intpid
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST} go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}

View File

@ -29,7 +29,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0" MinClusterVersion = "2.3.0"
Version = "3.0.0" Version = "3.0.2"
// Git SHA Value will be set during build // Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)" GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -129,15 +129,22 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err return nil, err
} }
if err := os.RemoveAll(dirpath); err != nil { // rename of directory with locked files doesn't work on windows; close
return nil, err // the WAL to release the locks so the directory can be renamed
} w.Close()
if err := os.Rename(tmpdirpath, dirpath); err != nil { if err := os.Rename(tmpdirpath, dirpath); err != nil {
return nil, err return nil, err
} }
// reopen and relock
w.fp = newFilePipeline(w.dir, segmentSizeBytes) newWAL, oerr := Open(dirpath, walpb.Snapshot{})
return w, nil if oerr != nil {
return nil, oerr
}
if _, _, _, err := newWAL.ReadAll(); err != nil {
newWAL.Close()
return nil, err
}
return newWAL, nil
} }
// Open opens the WAL at the given snap. // Open opens the WAL at the given snap.