Compare commits

...

43 Commits

Author SHA1 Message Date
24a90baff8 version: bump to v3.0.3 2016-07-15 11:26:14 -07:00
6b7891d5f1 integration: add FailFast(false) to failing tests 2016-07-14 19:01:17 -07:00
129b271ff8 clientv3: use grpc.FailFast(false) for all calls 2016-07-14 19:00:46 -07:00
a11ee983c4 vendor: update grpc
Fixes #5871
2016-07-14 18:47:02 -07:00
bec58d5f58 integration: test grpc error equivalence with Error() 2016-07-14 18:47:00 -07:00
4b6f9b79e6 rpctypes: test error equivalence with Error()
grpc.Errorf() now returns *rpcError, which makes comparisons shallow.
2016-07-14 18:46:58 -07:00
f7ec7f025b embed: only get initial cluster setting if the member is not init 2016-07-14 13:01:29 -07:00
34c76a47c1 Revert "Dockerfile: use 'ENTRYPOINT' instead of 'CMD'" 2016-07-14 12:24:06 -07:00
525653ff51 raft: do not change RecentActive when resetState for progress 2016-07-12 09:59:42 -07:00
a647b79038 etcdserver: fix TestSnap 2016-07-11 13:59:12 -07:00
9bc1d08753 etcdctl: only takes 127.0.0.1:2379 as default endpoint 2016-07-11 13:41:53 -07:00
6a79bda691 e2e: add basic upgrade tests 2016-07-11 13:41:50 -07:00
1edfcd6859 test: add upgrade test flag 2016-07-11 13:41:47 -07:00
f51fdbccec version: bump to v3.0.2+git 2016-07-08 12:09:09 -07:00
faeeb2fc75 version: bump to v3.0.2 2016-07-08 11:45:18 -07:00
d50c487132 v3rpc: lock progress and prevKV map correctly 2016-07-08 10:16:10 -07:00
b837feffe4 client/integration: test v2 client one shot operations 2016-07-07 17:30:09 -07:00
4d89640195 client: make set/delete one shot operations
Old behavior would retry set and delete even if there's an error. This
can lead to the client returning an error for deleting twice, instead
of returning an error for an interdeterminate state.

Fixes #5832
2016-07-07 17:30:04 -07:00
1292d453c3 clientv3: fix sync base
It is not correct to use WithPrefix. Range end will change in every
internal batch.
2016-07-07 14:21:43 -07:00
ec20b381ed clientv3: add public function to get prefix range end 2016-07-07 14:21:41 -07:00
37cc3f5262 Dockerfile: use 'ENTRYPOINT' instead of 'CMD'
use entrypoint, so people can specify flags to etcd
without providing the binary.

Signed-off-by: Secret <haichuang221@163.com>
2016-07-05 11:40:47 -07:00
7f1940e5ed etcdserver: commit before sending snapshot 2016-07-05 11:06:54 -07:00
caccf8e5e6 v3rpc: do not panic on user error for watch 2016-07-05 11:06:35 -07:00
ef65dfe2eb wal: release wal locks before renaming directory on init
Fixes #5852
2016-07-05 11:05:51 -07:00
ff6c6916f2 etcdserver/api: print only major.minor version API
Before

2016-07-01 14:57:50.927170 I | api: enabled capabilities for version 3.0.0

After

2016-07-01 14:57:50.927170 I | api: enabled capabilities for version 3.0
2016-07-01 15:19:53 -07:00
3dfe8765d3 version: bump to v3.0.1+git 2016-07-01 14:53:20 -07:00
a4a52cb15d version: bump to v3.0.1 2016-07-01 13:58:37 -07:00
014970930a *: test, docs with go1.6+
etcd v3 uses http/2, which doesn't work well with go1.5
2016-07-01 11:59:37 -07:00
4628be982c Documentation: fix typo in api_grpc_gateway.md 2016-07-01 11:59:35 -07:00
ff55e5a188 etcdserver: exit on missing backend only if semver is >= 3.0.0 2016-07-01 11:59:32 -07:00
bf0898266c release: fix Dockerfile etcd binary paths
release script uses binary files in 'release/image-docker',
not the ones in "bin/". Tested with v3.0.0 release.
2016-06-30 12:27:34 -07:00
b9d69f7698 version: bump to v3.0.0+git 2016-06-30 11:37:05 -07:00
6f48bda7ac version: bump to v3.0.0 2016-06-30 10:04:59 -07:00
316534e09e *: remove beta from docs 2016-06-30 10:04:34 -07:00
3cecbdb464 hack: install goreman in tls-setup example 2016-06-30 09:33:19 -07:00
62f11e43ee hack: add tls-setup example generated certs to gitignore 2016-06-30 09:33:12 -07:00
064c1585ee Merge pull request #5822 from raoofm/patch-9
Doc: fix typo in dev-guide.md
2016-06-30 09:06:32 -07:00
15300a1eb8 Doc: fix typo in dev-guide.md 2016-06-30 10:36:50 -04:00
58dd047ee4 ctlv3: make flags, commands formats consistent
1. Capitalize first letter
2. Remove period at the end

(followed the pattern in linux coreutil man page)
2016-06-29 16:16:56 -07:00
4b42ea6cd7 clientv3: only use closeErr on watch when donec is closed
Fixes #5800
2016-06-28 17:48:44 -07:00
53c27ae621 benchmark: fix Compact request 2016-06-28 14:15:32 -07:00
269de67bde mvcc: do not hash consistent index 2016-06-28 12:29:36 -07:00
8bbccf1047 clientv3, ctl3, clientv3/integration: add compact response to compact 2016-06-28 12:29:32 -07:00
84 changed files with 1038 additions and 396 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@
/hack/insta-discovery/.env /hack/insta-discovery/.env
*.test *.test
tools/functional-tester/docker/bin tools/functional-tester/docker/bin
hack/tls-setup/certs

View File

@ -4,7 +4,6 @@ go_import_path: github.com/coreos/etcd
sudo: false sudo: false
go: go:
- 1.5
- 1.6 - 1.6
- tip - tip
@ -22,10 +21,6 @@ matrix:
allow_failures: allow_failures:
- go: tip - go: tip
exclude: exclude:
- go: 1.5
env: TARGET=arm
- go: 1.5
env: TARGET=ppc64le
- go: 1.6 - go: 1.6
env: TARGET=arm64 env: TARGET=arm64
- go: tip - go: tip

View File

@ -1,7 +1,7 @@
FROM alpine:latest FROM alpine:latest
ADD bin/etcd /usr/local/bin/ ADD etcd /usr/local/bin/
ADD bin/etcdctl /usr/local/bin/ ADD etcdctl /usr/local/bin/
RUN mkdir -p /var/etcd/ RUN mkdir -p /var/etcd/
EXPOSE 2379 2380 EXPOSE 2379 2380

View File

@ -25,7 +25,7 @@ curl -L http://localhost:2379/v3alpha/kv/range \
## Swagger ## Swagger
Generated [Swapper][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc]. Generated [Swagger][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
[api-ref]: ./api_reference_v3.md [api-ref]: ./api_reference_v3.md
[go-client]: https://github.com/coreos/etcd/tree/master/clientv3 [go-client]: https://github.com/coreos/etcd/tree/master/clientv3

View File

@ -4,5 +4,5 @@ For the most part, the etcd project is stable, but we are still moving fast! We
## The current experimental API/features are: ## The current experimental API/features are:
- v3 auth API: expect to be stale in 3.1 release - v3 auth API: expect to be stable in 3.1 release
- etcd gateway: expect to be stable in 3.1 release - etcd gateway: expect to be stable in 3.1 release

View File

@ -11,7 +11,7 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
## Build the latest version ## Build the latest version
For those wanting to try the very latest version, build etcd from the `master` branch. For those wanting to try the very latest version, build etcd from the `master` branch.
[Go](https://golang.org/) version 1.5+ is required to build the latest version of etcd. [Go](https://golang.org/) version 1.6+ (with HTTP2 support) is required to build the latest version of etcd.
Here are the commands to build an etcd binary from the `master` branch: Here are the commands to build an etcd binary from the `master` branch:

View File

@ -8,7 +8,7 @@ In order to expose the etcd API to clients outside of Docker host, use the host
``` ```
# For each machine # For each machine
ETCD_VERSION=v3.0.0-beta.0 ETCD_VERSION=v3.0.0
TOKEN=my-etcd-token TOKEN=my-etcd-token
CLUSTER_STATE=new CLUSTER_STATE=new
NAME_1=etcd-node-0 NAME_1=etcd-node-0

View File

@ -40,7 +40,7 @@ See [etcdctl][etcdctl] for a simple command line client.
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release]. The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch. For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch.
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.5+ is required). You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
All development occurs on `master`, including new features and bug fixes. All development occurs on `master`, including new features and bug fixes.
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide. Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.

View File

@ -37,6 +37,10 @@ var (
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
errTooManyRedirectChecks = errors.New("client: too many redirect checks") errTooManyRedirectChecks = errors.New("client: too many redirect checks")
// oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
// that Do() will not retry a request
oneShotCtxValue interface{}
) )
var DefaultRequestTimeout = 5 * time.Second var DefaultRequestTimeout = 5 * time.Second
@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
var body []byte var body []byte
var err error var err error
cerr := &ClusterError{} cerr := &ClusterError{}
isOneShot := ctx.Value(&oneShotCtxValue) != nil
for i := pinned; i < leps+pinned; i++ { for i := pinned; i < leps+pinned; i++ {
k := i % leps k := i % leps
@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
if err == context.Canceled || err == context.DeadlineExceeded { if err == context.Canceled || err == context.DeadlineExceeded {
return nil, nil, err return nil, nil, err
} }
if isOneShot {
return nil, nil, err
}
continue continue
} }
if resp.StatusCode/100 == 5 { if resp.StatusCode/100 == 5 {
@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
default: default:
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode))) cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
} }
if isOneShot {
return nil, nil, cerr.Errors[0]
}
continue continue
} }
if k != pinned { if k != pinned {

View File

@ -0,0 +1,134 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"
"golang.org/x/net/context"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)
// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection.
func TestV2NoRetryEOF(t *testing.T) {
defer testutil.AfterTest(t)
// generate an EOF response; specify address so appears first in sorted ep list
lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid()))
defer lEOF.Close()
tries := uint32(0)
go func() {
for {
conn, err := lEOF.Accept()
if err != nil {
return
}
atomic.AddUint32(&tries, 1)
conn.Close()
}
}()
eofURL := integration.UrlScheme + "://" + lEOF.Addr().String()
cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil)
kapi := client.NewKeysAPI(cli)
for i, f := range noRetryList(kapi) {
startTries := atomic.LoadUint32(&tries)
if err := f(); err == nil {
t.Errorf("#%d: expected EOF error, got nil", i)
}
endTries := atomic.LoadUint32(&tries)
if startTries+1 != endTries {
t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries)
}
}
}
// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code.
func TestV2NoRetryNoLeader(t *testing.T) {
defer testutil.AfterTest(t)
lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid()))
eh := &errHandler{errCode: http.StatusServiceUnavailable}
srv := httptest.NewUnstartedServer(eh)
defer lHttp.Close()
defer srv.Close()
srv.Listener = lHttp
go srv.Start()
lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String()
cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil)
kapi := client.NewKeysAPI(cli)
// test error code
for i, f := range noRetryList(kapi) {
reqs := eh.reqs
if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") {
t.Errorf("#%d: expected \"no leader\", got %v", i, err)
}
if eh.reqs != reqs+1 {
t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs)
}
}
}
// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
func TestV2RetryRefuse(t *testing.T) {
defer testutil.AfterTest(t)
cl := integration.NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
// test connection refused; expect no error failover
cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil)
kapi := client.NewKeysAPI(cli)
if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil {
t.Fatal(err)
}
for i, f := range noRetryList(kapi) {
if err := f(); err != nil {
t.Errorf("#%d: unexpected retry failure (%v)", i, err)
}
}
}
type errHandler struct {
errCode int
reqs int
}
func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req.Body.Close()
eh.reqs++
w.WriteHeader(eh.errCode)
}
func noRetryList(kapi client.KeysAPI) []func() error {
return []func() error{
func() error {
opts := &client.SetOptions{PrevExist: client.PrevNoExist}
_, err := kapi.Set(context.Background(), "/setkey", "bar", opts)
return err
},
func() error {
_, err := kapi.Delete(context.Background(), "/delkey", nil)
return err
},
}
}

View File

@ -0,0 +1,20 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package integration
import (
"os"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

View File

@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
act.Dir = opts.Dir act.Dir = opts.Dir
} }
resp, body, err := k.client.Do(ctx, act) doCtx := ctx
if act.PrevExist == PrevNoExist {
doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
}
resp, body, err := k.client.Do(doCtx, act)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption
act.Recursive = opts.Recursive act.Recursive = opts.Recursive
} }
resp, body, err := k.client.Do(ctx, act) doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
resp, body, err := k.client.Do(doCtx, act)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -115,52 +115,52 @@ func NewAuth(c *Client) Auth {
} }
func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
return (*AuthEnableResponse)(resp), toErr(ctx, err) return (*AuthEnableResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
return (*AuthDisableResponse)(resp), toErr(ctx, err) return (*AuthDisableResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserAddResponse)(resp), toErr(ctx, err) return (*AuthUserAddResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false))
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false))
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false))
return (*AuthUserGetResponse)(resp), toErr(ctx, err) return (*AuthUserGetResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false))
return (*AuthUserListResponse)(resp), toErr(ctx, err) return (*AuthUserListResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false))
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false))
return (*AuthRoleAddResponse)(resp), toErr(ctx, err) return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
} }
@ -170,27 +170,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
RangeEnd: []byte(rangeEnd), RangeEnd: []byte(rangeEnd),
PermType: authpb.Permission_Type(permType), PermType: authpb.Permission_Type(permType),
} }
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, grpc.FailFast(false))
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false))
return (*AuthRoleGetResponse)(resp), toErr(ctx, err) return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false))
return (*AuthRoleListResponse)(resp), toErr(ctx, err) return (*AuthRoleListResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false))
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
} }
func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false))
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
} }
@ -208,7 +208,7 @@ type authenticator struct {
} }
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthenticateResponse)(resp), toErr(ctx, err) return (*AuthenticateResponse)(resp), toErr(ctx, err)
} }

View File

@ -17,6 +17,7 @@ package clientv3
import ( import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type ( type (
@ -51,7 +52,7 @@ func NewCluster(c *Client) Cluster {
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
r := &pb.MemberAddRequest{PeerURLs: peerAddrs} r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.remote.MemberAdd(ctx, r) resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return (*MemberAddResponse)(resp), nil return (*MemberAddResponse)(resp), nil
} }
@ -63,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
r := &pb.MemberRemoveRequest{ID: id} r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r) resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return (*MemberRemoveResponse)(resp), nil return (*MemberRemoveResponse)(resp), nil
} }
@ -77,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
// it is safe to retry on update. // it is safe to retry on update.
for { for {
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r) resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return (*MemberUpdateResponse)(resp), nil return (*MemberUpdateResponse)(resp), nil
} }
@ -90,7 +91,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list. // it is safe to retry on list.
for { for {
resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false))
if err == nil { if err == nil {
return (*MemberListResponse)(resp), nil return (*MemberListResponse)(resp), nil
} }

View File

@ -210,7 +210,7 @@ func ExampleKV_compact() {
compRev := resp.Header.Revision // specify compact revision of your choice compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
err = cli.Compact(ctx, compRev) _, err = cli.Compact(ctx, compRev)
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -470,17 +470,17 @@ func TestKVCompactError(t *testing.T) {
t.Fatalf("couldn't put 'foo' (%v)", err) t.Fatalf("couldn't put 'foo' (%v)", err)
} }
} }
err := kv.Compact(ctx, 6) _, err := kv.Compact(ctx, 6)
if err != nil { if err != nil {
t.Fatalf("couldn't compact 6 (%v)", err) t.Fatalf("couldn't compact 6 (%v)", err)
} }
err = kv.Compact(ctx, 6) _, err = kv.Compact(ctx, 6)
if err != rpctypes.ErrCompacted { if err != rpctypes.ErrCompacted {
t.Fatalf("expected %v, got %v", rpctypes.ErrCompacted, err) t.Fatalf("expected %v, got %v", rpctypes.ErrCompacted, err)
} }
err = kv.Compact(ctx, 100) _, err = kv.Compact(ctx, 100)
if err != rpctypes.ErrFutureRev { if err != rpctypes.ErrFutureRev {
t.Fatalf("expected %v, got %v", rpctypes.ErrFutureRev, err) t.Fatalf("expected %v, got %v", rpctypes.ErrFutureRev, err)
} }
@ -501,11 +501,11 @@ func TestKVCompact(t *testing.T) {
} }
} }
err := kv.Compact(ctx, 7) _, err := kv.Compact(ctx, 7)
if err != nil { if err != nil {
t.Fatalf("couldn't compact kv space (%v)", err) t.Fatalf("couldn't compact kv space (%v)", err)
} }
err = kv.Compact(ctx, 7) _, err = kv.Compact(ctx, 7)
if err == nil || err != rpctypes.ErrCompacted { if err == nil || err != rpctypes.ErrCompacted {
t.Fatalf("error got %v, want %v", err, rpctypes.ErrCompacted) t.Fatalf("error got %v, want %v", err, rpctypes.ErrCompacted)
} }
@ -525,7 +525,7 @@ func TestKVCompact(t *testing.T) {
t.Fatalf("wchan got %v, expected closed", wr) t.Fatalf("wchan got %v, expected closed", wr)
} }
err = kv.Compact(ctx, 1000) _, err = kv.Compact(ctx, 1000)
if err == nil || err != rpctypes.ErrFutureRev { if err == nil || err != rpctypes.ErrFutureRev {
t.Fatalf("error got %v, want %v", err, rpctypes.ErrFutureRev) t.Fatalf("error got %v, want %v", err, rpctypes.ErrFutureRev)
} }

View File

@ -15,7 +15,9 @@
package integration package integration
import ( import (
"fmt"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
t.Fatal("failed to receive update in one second") t.Fatal("failed to receive update in one second")
} }
} }
func TestMirrorSyncBase(t *testing.T) {
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(nil)
cli := cluster.Client(0)
ctx := context.TODO()
keyCh := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for key := range keyCh {
if _, err := cli.Put(ctx, key, "test"); err != nil {
t.Fatal(err)
}
}
}()
}
for i := 0; i < 2000; i++ {
keyCh <- fmt.Sprintf("test%d", i)
}
close(keyCh)
wg.Wait()
syncer := mirror.NewSyncer(cli, "test", 0)
respCh, errCh := syncer.SyncBase(ctx)
count := 0
for resp := range respCh {
count = count + len(resp.Kvs)
if !resp.More {
break
}
}
for err := range errCh {
t.Fatalf("unexpected error %v", err)
}
if count != 2000 {
t.Errorf("unexpected kv count: %d", count)
}
}

View File

@ -375,7 +375,7 @@ func TestWatchResumeCompacted(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
if err := kv.Compact(context.TODO(), 3); err != nil { if _, err := kv.Compact(context.TODO(), 3); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -414,7 +414,7 @@ func TestWatchCompactRevision(t *testing.T) {
w := clientv3.NewWatcher(clus.RandClient()) w := clientv3.NewWatcher(clus.RandClient())
defer w.Close() defer w.Close()
if err := kv.Compact(context.TODO(), 4); err != nil { if _, err := kv.Compact(context.TODO(), 4); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2)) wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))

View File

@ -17,13 +17,15 @@ package clientv3
import ( import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type ( type (
PutResponse pb.PutResponse CompactResponse pb.CompactionResponse
GetResponse pb.RangeResponse PutResponse pb.PutResponse
DeleteResponse pb.DeleteRangeResponse GetResponse pb.RangeResponse
TxnResponse pb.TxnResponse DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
) )
type KV interface { type KV interface {
@ -47,7 +49,7 @@ type KV interface {
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev. // Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) error Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction. // Do applies a single Op on KV without a transaction.
// Do is useful when declaring operations to be issued at a later time // Do is useful when declaring operations to be issued at a later time
@ -98,11 +100,12 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
return r.del, toErr(ctx, err) return r.del, toErr(ctx, err)
} }
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) error { func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
if _, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()); err != nil { resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), grpc.FailFast(false))
return toErr(ctx, err) if err != nil {
return nil, toErr(ctx, err)
} }
return nil return (*CompactResponse)(resp), err
} }
func (kv *kv) Txn(ctx context.Context) Txn { func (kv *kv) Txn(ctx context.Context) Txn {
@ -148,21 +151,21 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
} }
resp, err = kv.remote.Range(ctx, r) resp, err = kv.remote.Range(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil return OpResponse{get: (*GetResponse)(resp)}, nil
} }
case tPut: case tPut:
var resp *pb.PutResponse var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
resp, err = kv.remote.Put(ctx, r) resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil return OpResponse{put: (*PutResponse)(resp)}, nil
} }
case tDeleteRange: case tDeleteRange:
var resp *pb.DeleteRangeResponse var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
resp, err = kv.remote.DeleteRange(ctx, r) resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil return OpResponse{del: (*DeleteResponse)(resp)}, nil
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type ( type (
@ -129,7 +130,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
for { for {
r := &pb.LeaseGrantRequest{TTL: ttl} r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(cctx, r) resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
gresp := &LeaseGrantResponse{ gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(), ResponseHeader: resp.GetHeader(),
@ -155,7 +156,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
for { for {
r := &pb.LeaseRevokeRequest{ID: int64(id)} r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(cctx, r) resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false))
if err == nil { if err == nil {
return (*LeaseRevokeResponse)(resp), nil return (*LeaseRevokeResponse)(resp), nil
@ -261,7 +262,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
cctx, cancel := context.WithCancel(ctx) cctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
stream, err := l.remote.LeaseKeepAlive(cctx) stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
@ -418,7 +419,7 @@ func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
func (l *lessor) newStream() error { func (l *lessor) newStream() error {
sctx, cancel := context.WithCancel(l.stopCtx) sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx) stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err != nil { if err != nil {
cancel() cancel()
return toErr(sctx, err) return toErr(sctx, err)

View File

@ -19,6 +19,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
type ( type (
@ -67,7 +68,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
Alarm: pb.AlarmType_NONE, // all Alarm: pb.AlarmType_NONE, // all
} }
for { for {
resp, err := m.remote.Alarm(ctx, req) resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
if err == nil { if err == nil {
return (*AlarmResponse)(resp), nil return (*AlarmResponse)(resp), nil
} }
@ -100,7 +101,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
return &ret, nil return &ret, nil
} }
resp, err := m.remote.Alarm(ctx, req) resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
if err == nil { if err == nil {
return (*AlarmResponse)(resp), nil return (*AlarmResponse)(resp), nil
} }
@ -114,7 +115,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
} }
defer conn.Close() defer conn.Close()
remote := pb.NewMaintenanceClient(conn) remote := pb.NewMaintenanceClient(conn)
resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false))
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
@ -128,7 +129,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
} }
defer conn.Close() defer conn.Close()
remote := pb.NewMaintenanceClient(conn) remote := pb.NewMaintenanceClient(conn)
resp, err := remote.Status(ctx, &pb.StatusRequest{}) resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false))
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
@ -136,7 +137,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
} }
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
if err != nil { if err != nil {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }

View File

@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
// If len(s.prefix) != 0, we will sync key-value space with given prefix. // If len(s.prefix) != 0, we will sync key-value space with given prefix.
// We then range from the prefix to the next prefix if exists. Or we will // We then range from the prefix to the next prefix if exists. Or we will
// range from the prefix to the end if the next prefix does not exists. // range from the prefix to the end if the next prefix does not exists.
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
key = s.prefix key = s.prefix
} }

View File

@ -182,6 +182,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
} }
} }
// GetPrefixRangeEnd gets the range end of the prefix.
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
func GetPrefixRangeEnd(prefix string) string {
return string(getPrefix([]byte(prefix)))
}
func getPrefix(key []byte) []byte { func getPrefix(key []byte) []byte {
end := make([]byte, len(key)) end := make([]byte, len(key))
copy(end, key) copy(end, key)

View File

@ -19,6 +19,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
// Txn is the interface that wraps mini-transactions. // Txn is the interface that wraps mini-transactions.
@ -152,7 +153,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
func (txn *txn) commit() (*TxnResponse, error) { func (txn *txn) commit() (*TxnResponse, error) {
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
resp, err := txn.kv.remote.Txn(txn.ctx, r) resp, err := txn.kv.remote.Txn(txn.ctx, r, grpc.FailFast(false))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -23,6 +23,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
mvccpb "github.com/coreos/etcd/mvcc/mvccpb" mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
) )
const ( const (
@ -505,6 +506,7 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
// serveStream forwards watch responses from run() to the subscriber // serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) { func (w *watchGrpcStream) serveStream(ws *watcherStream) {
var closeErr error
emptyWr := &WatchResponse{} emptyWr := &WatchResponse{}
wrs := []*WatchResponse{} wrs := []*WatchResponse{}
resuming := false resuming := false
@ -569,13 +571,14 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
} }
case <-w.donec: case <-w.donec:
closing = true closing = true
closeErr = w.closeErr
case <-ws.initReq.ctx.Done(): case <-ws.initReq.ctx.Done():
closing = true closing = true
} }
} }
// try to send off close error // try to send off close error
if w.closeErr != nil { if closeErr != nil {
select { select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}: case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec: case <-w.donec:
@ -621,7 +624,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return nil, err return nil, err
default: default:
} }
if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break break
} }
if isHaltErr(w.ctx, err) { if isHaltErr(w.ctx, err) {

27
cmd/Godeps/Godeps.json generated
View File

@ -234,39 +234,48 @@
}, },
{ {
"ImportPath": "google.golang.org/grpc", "ImportPath": "google.golang.org/grpc",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/codes", "ImportPath": "google.golang.org/grpc/codes",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/credentials", "ImportPath": "google.golang.org/grpc/credentials",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/grpclog", "ImportPath": "google.golang.org/grpc/grpclog",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/internal", "ImportPath": "google.golang.org/grpc/internal",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/metadata", "ImportPath": "google.golang.org/grpc/metadata",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/naming", "ImportPath": "google.golang.org/grpc/naming",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/peer", "ImportPath": "google.golang.org/grpc/peer",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "google.golang.org/grpc/transport", "ImportPath": "google.golang.org/grpc/transport",
"Rev": "e78224b060cf3215247b7be455f80ea22e469b66" "Comment": "v1.0.0-6-g02fca89",
"Rev": "02fca896ff5f50c6bbbee0860345a49344b37a03"
}, },
{ {
"ImportPath": "gopkg.in/cheggaaa/pb.v1", "ImportPath": "gopkg.in/cheggaaa/pb.v1",

View File

@ -40,7 +40,6 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming" "google.golang.org/grpc/naming"
"google.golang.org/grpc/transport"
) )
// Address represents a server the client connects to. // Address represents a server the client connects to.
@ -94,10 +93,10 @@ type Balancer interface {
// instead of blocking. // instead of blocking.
// //
// The function returns put which is called once the rpc has completed or failed. // The function returns put which is called once the rpc has completed or failed.
// put can collect and report RPC stats to a remote load balancer. gRPC internals // put can collect and report RPC stats to a remote load balancer.
// will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
// //
// TODO: Add other non-recoverable errors? // This function should only return the errors Balancer cannot recover by itself.
// gRPC internals will fail the RPC if an error is returned.
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify returns a channel that is used by gRPC internals to watch the addresses // Notify returns a channel that is used by gRPC internals to watch the addresses
// gRPC needs to connect. The addresses might be from a name resolver or remote // gRPC needs to connect. The addresses might be from a name resolver or remote
@ -139,35 +138,40 @@ func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r} return &roundRobin{r: r}
} }
type addrInfo struct {
addr Address
connected bool
}
type roundRobin struct { type roundRobin struct {
r naming.Resolver r naming.Resolver
w naming.Watcher w naming.Watcher
open []Address // all the addresses the client should potentially connect addrs []*addrInfo // all the addresses the client should potentially connect
mu sync.Mutex mu sync.Mutex
addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
connected []Address // all the connected addresses next int // index of the next address to return for Get()
next int // index of the next address to return for Get() waitCh chan struct{} // the channel to block when there is no connected address available
waitCh chan struct{} // the channel to block when there is no connected address available done bool // The Balancer is closed.
done bool // The Balancer is closed.
} }
func (rr *roundRobin) watchAddrUpdates() error { func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next() updates, err := rr.w.Next()
if err != nil { if err != nil {
grpclog.Println("grpc: the naming watcher stops working due to %v.", err) grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
return err return err
} }
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
for _, update := range updates { for _, update := range updates {
addr := Address{ addr := Address{
Addr: update.Addr, Addr: update.Addr,
Metadata: update.Metadata,
} }
switch update.Op { switch update.Op {
case naming.Add: case naming.Add:
var exist bool var exist bool
for _, v := range rr.open { for _, v := range rr.addrs {
if addr == v { if addr == v.addr {
exist = true exist = true
grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr) grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
break break
@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
if exist { if exist {
continue continue
} }
rr.open = append(rr.open, addr) rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete: case naming.Delete:
for i, v := range rr.open { for i, v := range rr.addrs {
if v == addr { if addr == v.addr {
copy(rr.open[i:], rr.open[i+1:]) copy(rr.addrs[i:], rr.addrs[i+1:])
rr.open = rr.open[:len(rr.open)-1] rr.addrs = rr.addrs[:len(rr.addrs)-1]
break break
} }
} }
@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error {
grpclog.Println("Unknown update.Op ", update.Op) grpclog.Println("Unknown update.Op ", update.Op)
} }
} }
// Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified. // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.open), len(rr.open)) open := make([]Address, len(rr.addrs))
copy(open, rr.open) for i, v := range rr.addrs {
open[i] = v.addr
}
if rr.done { if rr.done {
return ErrClientConnClosing return ErrClientConnClosing
} }
@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error {
func (rr *roundRobin) Start(target string) error { func (rr *roundRobin) Start(target string) error {
if rr.r == nil { if rr.r == nil {
// If there is no name resolver installed, it is not needed to // If there is no name resolver installed, it is not needed to
// do name resolution. In this case, rr.addrCh stays nil. // do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil return nil
} }
w, err := rr.r.Resolve(target) w, err := rr.r.Resolve(target)
@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error {
return nil return nil
} }
// Up appends addr to the end of rr.connected and sends notification if there // Up sets the connected state of addr and sends notification if there are pending
// are pending Get() calls. // Get() calls.
func (rr *roundRobin) Up(addr Address) func(error) { func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
for _, a := range rr.connected { var cnt int
if a == addr { for _, a := range rr.addrs {
return nil if a.addr == addr {
if a.connected {
return nil
}
a.connected = true
}
if a.connected {
cnt++
} }
} }
rr.connected = append(rr.connected, addr) // addr is only one which is connected. Notify the Get() callers who are blocking.
if len(rr.connected) == 1 { if cnt == 1 && rr.waitCh != nil {
// addr is only one available. Notify the Get() callers who are blocking. close(rr.waitCh)
if rr.waitCh != nil { rr.waitCh = nil
close(rr.waitCh)
rr.waitCh = nil
}
} }
return func(err error) { return func(err error) {
rr.down(addr, err) rr.down(addr, err)
} }
} }
// down removes addr from rr.connected and moves the remaining addrs forward. // down unsets the connected state of addr.
func (rr *roundRobin) down(addr Address, err error) { func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
for i, a := range rr.connected { for _, a := range rr.addrs {
if a == addr { if addr == a.addr {
copy(rr.connected[i:], rr.connected[i+1:]) a.connected = false
rr.connected = rr.connected[:len(rr.connected)-1] break
return
} }
} }
} }
@ -266,17 +277,40 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing err = ErrClientConnClosing
return return
} }
if rr.next >= len(rr.connected) {
rr.next = 0 if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
} }
if len(rr.connected) > 0 { if !opts.BlockingWait {
addr = rr.connected[rr.next] if len(rr.addrs) == 0 {
rr.mu.Unlock()
err = fmt.Errorf("there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.
addr = rr.addrs[rr.next].addr
rr.next++ rr.next++
rr.mu.Unlock() rr.mu.Unlock()
return return
} }
// There is no address available. Wait on rr.waitCh. // Wait on rr.waitCh for non-failfast RPCs.
// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
if rr.waitCh == nil { if rr.waitCh == nil {
ch = make(chan struct{}) ch = make(chan struct{})
rr.waitCh = ch rr.waitCh = ch
@ -287,7 +321,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
err = transport.ContextErr(ctx.Err()) err = ctx.Err()
return return
case <-ch: case <-ch:
rr.mu.Lock() rr.mu.Lock()
@ -296,24 +330,35 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing err = ErrClientConnClosing
return return
} }
if len(rr.connected) == 0 {
// The newly added addr got removed by Down() again. if len(rr.addrs) > 0 {
if rr.waitCh == nil { if rr.next >= len(rr.addrs) {
ch = make(chan struct{}) rr.next = 0
rr.waitCh = ch }
} else { next := rr.next
ch = rr.waitCh for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
} }
rr.mu.Unlock()
continue
} }
if rr.next >= len(rr.connected) { // The newly added addr got removed by Down() again.
rr.next = 0 if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
} }
addr = rr.connected[rr.next]
rr.next++
rr.mu.Unlock() rr.mu.Unlock()
return
} }
} }
} }

View File

@ -101,7 +101,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
// Invoke is called by generated code. Also users can call Invoke directly when it // Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases. // is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) { func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
var c callInfo c := defaultCallInfo
for _, o := range opts { for _, o := range opts {
if err := o.before(&c); err != nil { if err := o.before(&c); err != nil {
return toRPCErr(err) return toRPCErr(err)
@ -155,19 +155,17 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts) t, put, err = cc.getTransport(ctx, gopts)
if err != nil { if err != nil {
// TODO(zhaoq): Probably revisit the error handling. // TODO(zhaoq): Probably revisit the error handling.
if err == ErrClientConnClosing { if _, ok := err.(*rpcError); ok {
return Errorf(codes.FailedPrecondition, "%v", err) return err
} }
if _, ok := err.(transport.StreamError); ok { if err == errConnClosing {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); ok {
if c.failFast { if c.failFast {
return toRPCErr(err) return Errorf(codes.Unavailable, "%v", errConnClosing)
} }
continue
} }
// All the remaining cases are treated as retryable. // All the other errors are treated as Internal errors.
continue return Errorf(codes.Internal, "%v", err)
} }
if c.traceInfo.tr != nil { if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)

View File

@ -218,27 +218,26 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
for _, opt := range opts { for _, opt := range opts {
opt(&cc.dopts) opt(&cc.dopts)
} }
// Set defaults.
if cc.dopts.codec == nil { if cc.dopts.codec == nil {
// Set the default codec.
cc.dopts.codec = protoCodec{} cc.dopts.codec = protoCodec{}
} }
if cc.dopts.bs == nil { if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig cc.dopts.bs = DefaultBackoffConfig
} }
if cc.dopts.balancer == nil {
cc.balancer = cc.dopts.balancer cc.dopts.balancer = RoundRobin(nil)
if cc.balancer == nil {
cc.balancer = RoundRobin(nil)
} }
if err := cc.balancer.Start(target); err != nil {
if err := cc.dopts.balancer.Start(target); err != nil {
return nil, err return nil, err
} }
var ( var (
ok bool ok bool
addrs []Address addrs []Address
) )
ch := cc.balancer.Notify() ch := cc.dopts.balancer.Notify()
if ch == nil { if ch == nil {
// There is no name resolver installed. // There is no name resolver installed.
addrs = append(addrs, Address{Addr: target}) addrs = append(addrs, Address{Addr: target})
@ -319,7 +318,6 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC server. // ClientConn represents a client connection to an RPC server.
type ClientConn struct { type ClientConn struct {
target string target string
balancer Balancer
authority string authority string
dopts dialOptions dopts dialOptions
@ -328,7 +326,7 @@ type ClientConn struct {
} }
func (cc *ClientConn) lbWatcher() { func (cc *ClientConn) lbWatcher() {
for addrs := range cc.balancer.Notify() { for addrs := range cc.dopts.balancer.Notify() {
var ( var (
add []Address // Addresses need to setup connections. add []Address // Addresses need to setup connections.
del []*addrConn // Connections need to tear down. del []*addrConn // Connections need to tear down.
@ -424,15 +422,14 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
} }
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
// TODO(zhaoq): Implement fail-fast logic. addr, put, err := cc.dopts.balancer.Get(ctx, opts)
addr, put, err := cc.balancer.Get(ctx, opts)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, toRPCErr(err)
} }
cc.mu.RLock() cc.mu.RLock()
if cc.conns == nil { if cc.conns == nil {
cc.mu.RUnlock() cc.mu.RUnlock()
return nil, nil, ErrClientConnClosing return nil, nil, toRPCErr(ErrClientConnClosing)
} }
ac, ok := cc.conns[addr] ac, ok := cc.conns[addr]
cc.mu.RUnlock() cc.mu.RUnlock()
@ -440,9 +437,9 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
if put != nil { if put != nil {
put() put()
} }
return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc") return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
} }
t, err := ac.wait(ctx) t, err := ac.wait(ctx, !opts.BlockingWait)
if err != nil { if err != nil {
if put != nil { if put != nil {
put() put()
@ -462,7 +459,7 @@ func (cc *ClientConn) Close() error {
conns := cc.conns conns := cc.conns
cc.conns = nil cc.conns = nil
cc.mu.Unlock() cc.mu.Unlock()
cc.balancer.Close() cc.dopts.balancer.Close()
for _, ac := range conns { for _, ac := range conns {
ac.tearDown(ErrClientConnClosing) ac.tearDown(ErrClientConnClosing)
} }
@ -610,7 +607,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
close(ac.ready) close(ac.ready)
ac.ready = nil ac.ready = nil
} }
ac.down = ac.cc.balancer.Up(ac.addr) ac.down = ac.cc.dopts.balancer.Up(ac.addr)
ac.mu.Unlock() ac.mu.Unlock()
return nil return nil
} }
@ -649,8 +646,9 @@ func (ac *addrConn) transportMonitor() {
} }
} }
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed. // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) { // iv) transport is in TransientFailure and the RPC is fail-fast.
func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
for { for {
ac.mu.Lock() ac.mu.Lock()
switch { switch {
@ -661,6 +659,9 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
ct := ac.transport ct := ac.transport
ac.mu.Unlock() ac.mu.Unlock()
return ct, nil return ct, nil
case ac.state == TransientFailure && failFast:
ac.mu.Unlock()
return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
default: default:
ready := ac.ready ready := ac.ready
if ready == nil { if ready == nil {
@ -670,7 +671,7 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
ac.mu.Unlock() ac.mu.Unlock()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, transport.ContextErr(ctx.Err()) return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed. // Wait until the new transport is ready or failed.
case <-ready: case <-ready:
} }

View File

@ -66,7 +66,7 @@ type PerRPCCredentials interface {
// TODO(zhaoq): Define the set of the qualified keys instead of leaving // TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string. // it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// RequireTransportSecurity indicates whether the credentails requires // RequireTransportSecurity indicates whether the credentials requires
// transport security. // transport security.
RequireTransportSecurity() bool RequireTransportSecurity() bool
} }
@ -116,7 +116,7 @@ func (t TLSInfo) AuthType() string {
// tlsCreds is the credentials required for authenticating a connection using TLS. // tlsCreds is the credentials required for authenticating a connection using TLS.
type tlsCreds struct { type tlsCreds struct {
// TLS configuration // TLS configuration
config tls.Config config *tls.Config
} }
func (c tlsCreds) Info() ProtocolInfo { func (c tlsCreds) Info() ProtocolInfo {
@ -151,14 +151,16 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
errChannel <- timeoutError{} errChannel <- timeoutError{}
}) })
} }
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := *c.config
if c.config.ServerName == "" { if c.config.ServerName == "" {
colonPos := strings.LastIndex(addr, ":") colonPos := strings.LastIndex(addr, ":")
if colonPos == -1 { if colonPos == -1 {
colonPos = len(addr) colonPos = len(addr)
} }
c.config.ServerName = addr[:colonPos] cfg.ServerName = addr[:colonPos]
} }
conn := tls.Client(rawConn, &c.config) conn := tls.Client(rawConn, &cfg)
if timeout == 0 { if timeout == 0 {
err = conn.Handshake() err = conn.Handshake()
} else { } else {
@ -177,7 +179,7 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
} }
func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) { func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
conn := tls.Server(rawConn, &c.config) conn := tls.Server(rawConn, c.config)
if err := conn.Handshake(); err != nil { if err := conn.Handshake(); err != nil {
rawConn.Close() rawConn.Close()
return nil, nil, err return nil, nil, err
@ -187,7 +189,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
// NewTLS uses c to construct a TransportCredentials based on TLS. // NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials { func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{*c} tc := &tlsCreds{c}
tc.config.NextProtos = alpnProtoStr tc.config.NextProtos = alpnProtoStr
return tc return tc
} }

View File

@ -141,6 +141,8 @@ type callInfo struct {
traceInfo traceInfo // in trace.go traceInfo traceInfo // in trace.go
} }
var defaultCallInfo = callInfo{failFast: true}
// CallOption configures a Call before it starts or extracts information from // CallOption configures a Call before it starts or extracts information from
// a Call after it completes. // a Call after it completes.
type CallOption interface { type CallOption interface {
@ -179,6 +181,19 @@ func Trailer(md *metadata.MD) CallOption {
}) })
} }
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failfast is true, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will retry
// the call if it fails due to a transient error. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
return nil
})
}
// The format of the payload: compressed or not? // The format of the payload: compressed or not?
type payloadFormat uint8 type payloadFormat uint8
@ -319,7 +334,7 @@ type rpcError struct {
desc string desc string
} }
func (e rpcError) Error() string { func (e *rpcError) Error() string {
return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc) return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
} }
@ -329,7 +344,7 @@ func Code(err error) codes.Code {
if err == nil { if err == nil {
return codes.OK return codes.OK
} }
if e, ok := err.(rpcError); ok { if e, ok := err.(*rpcError); ok {
return e.code return e.code
} }
return codes.Unknown return codes.Unknown
@ -341,7 +356,7 @@ func ErrorDesc(err error) string {
if err == nil { if err == nil {
return "" return ""
} }
if e, ok := err.(rpcError); ok { if e, ok := err.(*rpcError); ok {
return e.desc return e.desc
} }
return err.Error() return err.Error()
@ -353,7 +368,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
if c == codes.OK { if c == codes.OK {
return nil return nil
} }
return rpcError{ return &rpcError{
code: c, code: c,
desc: fmt.Sprintf(format, a...), desc: fmt.Sprintf(format, a...),
} }
@ -362,18 +377,37 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
// toRPCErr converts an error into a rpcError. // toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error { func toRPCErr(err error) error {
switch e := err.(type) { switch e := err.(type) {
case rpcError: case *rpcError:
return err return err
case transport.StreamError: case transport.StreamError:
return rpcError{ return &rpcError{
code: e.Code, code: e.Code,
desc: e.Desc, desc: e.Desc,
} }
case transport.ConnectionError: case transport.ConnectionError:
return rpcError{ return &rpcError{
code: codes.Internal, code: codes.Internal,
desc: e.Desc, desc: e.Desc,
} }
default:
switch err {
case context.DeadlineExceeded:
return &rpcError{
code: codes.DeadlineExceeded,
desc: err.Error(),
}
case context.Canceled:
return &rpcError{
code: codes.Canceled,
desc: err.Error(),
}
case ErrClientConnClosing:
return &rpcError{
code: codes.FailedPrecondition,
desc: err.Error(),
}
}
} }
return Errorf(codes.Unknown, "%v", err) return Errorf(codes.Unknown, "%v", err)
} }

View File

@ -82,6 +82,7 @@ type service struct {
server interface{} // the server for service methods server interface{} // the server for service methods
md map[string]*MethodDesc md map[string]*MethodDesc
sd map[string]*StreamDesc sd map[string]*StreamDesc
mdata interface{}
} }
// Server is a gRPC server to serve RPC requests. // Server is a gRPC server to serve RPC requests.
@ -231,6 +232,7 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
server: ss, server: ss,
md: make(map[string]*MethodDesc), md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc), sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
} }
for i := range sd.Methods { for i := range sd.Methods {
d := &sd.Methods[i] d := &sd.Methods[i]
@ -243,6 +245,52 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.m[sd.ServiceName] = srv s.m[sd.ServiceName] = srv
} }
// MethodInfo contains the information of an RPC including its method name and type.
type MethodInfo struct {
// Name is the method name only, without the service name or package name.
Name string
// IsClientStream indicates whether the RPC is a client streaming RPC.
IsClientStream bool
// IsServerStream indicates whether the RPC is a server streaming RPC.
IsServerStream bool
}
// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
type ServiceInfo struct {
Methods []MethodInfo
// Metadata is the metadata specified in ServiceDesc when registering service.
Metadata interface{}
}
// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
func (s *Server) GetServiceInfo() map[string]*ServiceInfo {
ret := make(map[string]*ServiceInfo)
for n, srv := range s.m {
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
for m := range srv.md {
methods = append(methods, MethodInfo{
Name: m,
IsClientStream: false,
IsServerStream: false,
})
}
for m, d := range srv.sd {
methods = append(methods, MethodInfo{
Name: m,
IsClientStream: d.ClientStreams,
IsServerStream: d.ServerStreams,
})
}
ret[n] = &ServiceInfo{
Methods: methods,
Metadata: srv.mdata,
}
}
return ret
}
var ( var (
// ErrServerStopped indicates that the operation is now illegal because of // ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped. // the server being stopped.
@ -272,9 +320,11 @@ func (s *Server) Serve(lis net.Listener) error {
s.lis[lis] = true s.lis[lis] = true
s.mu.Unlock() s.mu.Unlock()
defer func() { defer func() {
lis.Close()
s.mu.Lock() s.mu.Lock()
delete(s.lis, lis) if s.lis != nil && s.lis[lis] {
lis.Close()
delete(s.lis, lis)
}
s.mu.Unlock() s.mu.Unlock()
}() }()
for { for {
@ -529,7 +579,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
if appErr != nil { if appErr != nil {
if err, ok := appErr.(rpcError); ok { if err, ok := appErr.(*rpcError); ok {
statusCode = err.code statusCode = err.code
statusDesc = err.desc statusDesc = err.desc
} else { } else {
@ -614,7 +664,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler) appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
} }
if appErr != nil { if appErr != nil {
if err, ok := appErr.(rpcError); ok { if err, ok := appErr.(*rpcError); ok {
ss.statusCode = err.code ss.statusCode = err.code
ss.statusDesc = err.desc ss.statusDesc = err.desc
} else if err, ok := appErr.(transport.StreamError); ok { } else if err, ok := appErr.(transport.StreamError); ok {

View File

@ -102,16 +102,15 @@ type ClientStream interface {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var ( var (
t transport.ClientTransport t transport.ClientTransport
s *transport.Stream
err error err error
put func() put func()
) )
// TODO(zhaoq): CallOption is omitted. Add support when it is needed. c := defaultCallInfo
gopts := BalancerGetOptions{ for _, o := range opts {
BlockingWait: false, if err := o.before(&c); err != nil {
} return nil, toRPCErr(err)
t, put, err = cc.getTransport(ctx, gopts) }
if err != nil {
return nil, toRPCErr(err)
} }
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
@ -122,8 +121,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
callHdr.SendCompress = cc.dopts.cp.Type() callHdr.SendCompress = cc.dopts.cp.Type()
} }
cs := &clientStream{ cs := &clientStream{
opts: opts,
c: c,
desc: desc, desc: desc,
put: put,
codec: cc.dopts.codec, codec: cc.dopts.codec,
cp: cc.dopts.cp, cp: cc.dopts.cp,
dc: cc.dopts.dc, dc: cc.dopts.dc,
@ -142,11 +142,44 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
ctx = trace.NewContext(ctx, cs.trInfo.tr) ctx = trace.NewContext(ctx, cs.trInfo.tr)
} }
s, err := t.NewStream(ctx, callHdr) gopts := BalancerGetOptions{
if err != nil { BlockingWait: !c.failFast,
cs.finish(err)
return nil, toRPCErr(err)
} }
for {
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(*rpcError); ok {
return nil, err
}
if err == errConnClosing {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
}
continue
}
// All the other errors are treated as Internal errors.
return nil, Errorf(codes.Internal, "%v", err)
}
s, err = t.NewStream(ctx, callHdr)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok {
if c.failFast {
cs.finish(err)
return nil, toRPCErr(err)
}
continue
}
return nil, toRPCErr(err)
}
break
}
cs.put = put
cs.t = t cs.t = t
cs.s = s cs.s = s
cs.p = &parser{r: s} cs.p = &parser{r: s}
@ -167,6 +200,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream. // clientStream implements a client side Stream.
type clientStream struct { type clientStream struct {
opts []CallOption
c callInfo
t transport.ClientTransport t transport.ClientTransport
s *transport.Stream s *transport.Stream
p *parser p *parser
@ -312,15 +347,18 @@ func (cs *clientStream) closeTransportStream(err error) {
} }
func (cs *clientStream) finish(err error) { func (cs *clientStream) finish(err error) {
if !cs.tracing {
return
}
cs.mu.Lock() cs.mu.Lock()
defer cs.mu.Unlock() defer cs.mu.Unlock()
for _, o := range cs.opts {
o.after(&cs.c)
}
if cs.put != nil { if cs.put != nil {
cs.put() cs.put()
cs.put = nil cs.put = nil
} }
if !cs.tracing {
return
}
if cs.trInfo.tr != nil { if cs.trInfo.tr != nil {
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
cs.trInfo.tr.LazyPrintf("RPC: [OK]") cs.trInfo.tr.LazyPrintf("RPC: [OK]")

View File

@ -312,7 +312,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
Addr: ht.RemoteAddr(), Addr: ht.RemoteAddr(),
} }
if req.TLS != nil { if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{*req.TLS} pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
} }
ctx = metadata.NewContext(ctx, ht.headerMD) ctx = metadata.NewContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr) ctx = peer.NewContext(ctx, pr)

View File

@ -175,7 +175,10 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
} }
if initialWindowSize != defaultWindowSize { if initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) err = t.framer.writeSettings(true, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize),
})
} else { } else {
err = t.framer.writeSettings(true) err = t.framer.writeSettings(true)
} }

View File

@ -100,10 +100,15 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
if maxStreams == 0 { if maxStreams == 0 {
maxStreams = math.MaxUint32 maxStreams = math.MaxUint32
} else { } else {
settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams}) settings = append(settings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
})
} }
if initialWindowSize != defaultWindowSize { if initialWindowSize != defaultWindowSize {
settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) settings = append(settings, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize)})
} }
if err := framer.writeSettings(true, settings...); err != nil { if err := framer.writeSettings(true, settings...); err != nil {
return nil, ConnectionErrorf("transport: %v", err) return nil, ConnectionErrorf("transport: %v", err)

View File

@ -52,7 +52,7 @@ func TestCtlV3Migrate(t *testing.T) {
for i := range epc.procs { for i := range epc.procs {
dataDirs[i] = epc.procs[i].cfg.dataDirPath dataDirs[i] = epc.procs[i].cfg.dataDirPath
} }
if err := epc.Stop(); err != nil { if err := epc.StopAll(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err) t.Fatalf("error closing etcd processes (%v)", err)
} }
@ -74,7 +74,7 @@ func TestCtlV3Migrate(t *testing.T) {
for i := range epc.procs { for i := range epc.procs {
epc.procs[i].cfg.keepDataDir = true epc.procs[i].cfg.keepDataDir = true
} }
if err := epc.Restart(); err != nil { if err := epc.RestartAll(); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -0,0 +1,87 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"os"
"testing"
"time"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/testutil"
)
// TestReleaseUpgrade ensures that changes to master branch does not affect
// upgrade from latest etcd releases.
func TestReleaseUpgrade(t *testing.T) {
lastReleaseBinary := "../bin/etcd-last-release"
if !fileutil.Exist(lastReleaseBinary) {
t.Skipf("%q does not exist", lastReleaseBinary)
}
defer testutil.AfterTest(t)
copiedCfg := configNoTLS
copiedCfg.execPath = lastReleaseBinary
copiedCfg.snapCount = 3
epc, err := newEtcdProcessCluster(&copiedCfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()
os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")
cx := ctlCtx{
t: t,
cfg: configNoTLS,
dialTimeout: 7 * time.Second,
quorum: true,
epc: epc,
}
var kvs []kv
for i := 0; i < 5; i++ {
kvs = append(kvs, kv{key: fmt.Sprintf("foo%d", i), val: "bar"})
}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatalf("#%d: ctlV3Put error (%v)", i, err)
}
}
for i := range epc.procs {
if err := epc.procs[i].Stop(); err != nil {
t.Fatalf("#%d: error closing etcd process (%v)", i, err)
}
epc.procs[i].cfg.execPath = "../bin/etcd"
epc.procs[i].cfg.keepDataDir = true
if err := epc.procs[i].Restart(); err != nil {
t.Fatalf("error restarting etcd process (%v)", err)
}
for j := range kvs {
if err := ctlV3Get(cx, []string{kvs[j].key}, []kv{kvs[j]}...); err != nil {
cx.t.Fatalf("#%d-%d: ctlV3Get error (%v)", i, j, err)
}
}
}
}

View File

@ -21,6 +21,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/expect" "github.com/coreos/etcd/pkg/expect"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
) )
@ -122,7 +123,8 @@ type etcdProcess struct {
} }
type etcdProcessConfig struct { type etcdProcessConfig struct {
args []string execPath string
args []string
dataDirPath string dataDirPath string
keepDataDir bool keepDataDir bool
@ -137,12 +139,16 @@ type etcdProcessConfig struct {
} }
type etcdProcessClusterConfig struct { type etcdProcessClusterConfig struct {
execPath string
dataDirPath string dataDirPath string
keepDataDir bool keepDataDir bool
clusterSize int clusterSize int
basePort int basePort int
proxySize int proxySize int
snapCount int // default is 10000
clientTLS clientConnType clientTLS clientConnType
isPeerTLS bool isPeerTLS bool
isPeerAutoTLS bool isPeerAutoTLS bool
@ -175,7 +181,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
} }
func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) { func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
if !fileutil.Exist("../bin/etcd") { if !fileutil.Exist(cfg.execPath) {
return nil, fmt.Errorf("could not find etcd binary") return nil, fmt.Errorf("could not find etcd binary")
} }
@ -185,7 +191,7 @@ func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
} }
} }
child, err := spawnCmd(append([]string{"../bin/etcd"}, cfg.args...)) child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -197,6 +203,13 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
cfg.basePort = etcdProcessBasePort cfg.basePort = etcdProcessBasePort
} }
if cfg.execPath == "" {
cfg.execPath = "../bin/etcd"
}
if cfg.snapCount == 0 {
cfg.snapCount = etcdserver.DefaultSnapCount
}
clientScheme := "http" clientScheme := "http"
if cfg.clientTLS == clientTLS { if cfg.clientTLS == clientTLS {
clientScheme = "https" clientScheme = "https"
@ -244,6 +257,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
"--initial-advertise-peer-urls", purl.String(), "--initial-advertise-peer-urls", purl.String(),
"--initial-cluster-token", cfg.initialToken, "--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath, "--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
} }
if cfg.forceNewCluster { if cfg.forceNewCluster {
args = append(args, "--force-new-cluster") args = append(args, "--force-new-cluster")
@ -256,6 +270,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
args = append(args, cfg.tlsArgs()...) args = append(args, cfg.tlsArgs()...)
etcdCfgs[i] = &etcdProcessConfig{ etcdCfgs[i] = &etcdProcessConfig{
execPath: cfg.execPath,
args: args, args: args,
dataDirPath: dataDirPath, dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir, keepDataDir: cfg.keepDataDir,
@ -281,6 +296,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
} }
args = append(args, cfg.tlsArgs()...) args = append(args, cfg.tlsArgs()...)
etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
execPath: cfg.execPath,
args: args, args: args,
dataDirPath: dataDirPath, dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir, keepDataDir: cfg.keepDataDir,
@ -351,7 +367,7 @@ func (epc *etcdProcessCluster) Start() (err error) {
return nil return nil
} }
func (epc *etcdProcessCluster) Restart() error { func (epc *etcdProcessCluster) RestartAll() error {
for i := range epc.procs { for i := range epc.procs {
proc, err := newEtcdProcess(epc.procs[i].cfg) proc, err := newEtcdProcess(epc.procs[i].cfg)
if err != nil { if err != nil {
@ -363,7 +379,29 @@ func (epc *etcdProcessCluster) Restart() error {
return epc.Start() return epc.Start()
} }
func (epc *etcdProcessCluster) Stop() (err error) { func (epr *etcdProcess) Restart() error {
proc, err := newEtcdProcess(epr.cfg)
if err != nil {
epr.Stop()
return err
}
*epr = *proc
readyStr := "enabled capabilities for version"
if proc.cfg.isProxy {
readyStr = "httpproxy: endpoints found"
}
if _, err = proc.proc.Expect(readyStr); err != nil {
epr.Stop()
return err
}
close(proc.donec)
return nil
}
func (epc *etcdProcessCluster) StopAll() (err error) {
for _, p := range epc.procs { for _, p := range epc.procs {
if p == nil { if p == nil {
continue continue
@ -380,8 +418,21 @@ func (epc *etcdProcessCluster) Stop() (err error) {
return err return err
} }
func (epr *etcdProcess) Stop() error {
if epr == nil {
return nil
}
if err := epr.proc.Stop(); err != nil {
return err
}
<-epr.donec
return nil
}
func (epc *etcdProcessCluster) Close() error { func (epc *etcdProcessCluster) Close() error {
err := epc.Stop() err := epc.StopAll()
for _, p := range epc.procs { for _, p := range epc.procs {
os.RemoveAll(p.cfg.dataDirPath) os.RemoveAll(p.cfg.dataDirPath)
} }

View File

@ -497,25 +497,25 @@ ENDPOINT STATUS does not support protobuf encoded output.
```bash ```bash
./etcdctl endpoint status ./etcdctl endpoint status
127.0.0.1:2379, 8211f1d0f64f3269, 3.0.0-beta.0+git, 25 kB, false, 2, 63 127.0.0.1:2379, 8211f1d0f64f3269, 3.0.0, 25 kB, false, 2, 63
127.0.0.1:22379, 91bc3c398fb3c146, 3.0.0-beta.0+git, 25 kB, false, 2, 63 127.0.0.1:22379, 91bc3c398fb3c146, 3.0.0, 25 kB, false, 2, 63
127.0.0.1:32379, fd422379fda50e48, 3.0.0-beta.0+git, 25 kB, true, 2, 63 127.0.0.1:32379, fd422379fda50e48, 3.0.0, 25 kB, true, 2, 63
``` ```
```bash ```bash
./etcdctl -w json endpoint status ./etcdctl -w json endpoint status
[{"Endpoint":"127.0.0.1:2379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":9372538179322589801,"revision":2,"raft_term":2},"version":"2.3.0+git","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}},{"Endpoint":"127.0.0.1:22379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":10501334649042878790,"revision":2,"raft_term":2},"version":"2.3.0+git","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}},{"Endpoint":"127.0.0.1:32379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":18249187646912138824,"revision":2,"raft_term":2},"version":"2.3.0+git","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}}] [{"Endpoint":"127.0.0.1:2379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":9372538179322589801,"revision":2,"raft_term":2},"version":"3.0.0","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}},{"Endpoint":"127.0.0.1:22379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":10501334649042878790,"revision":2,"raft_term":2},"version":"3.0.0","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}},{"Endpoint":"127.0.0.1:32379","Status":{"header":{"cluster_id":17237436991929493444,"member_id":18249187646912138824,"revision":2,"raft_term":2},"version":"3.0.0","dbSize":24576,"leader":18249187646912138824,"raftIndex":32623,"raftTerm":2}}]
``` ```
```bash ```bash
./etcdctl -w table endpoint status ./etcdctl -w table endpoint status
+-----------------+------------------+------------------+---------+-----------+-----------+------------+ +-----------------+------------------+---------+---------+-----------+-----------+------------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | RAFT TERM | RAFT INDEX | | ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | RAFT TERM | RAFT INDEX |
+-----------------+------------------+------------------+---------+-----------+-----------+------------+ +-----------------+------------------+---------+---------+-----------+-----------+------------+
| 127.0.0.1:2379 | 8211f1d0f64f3269 | 3.0.0-beta.0+git | 25 kB | false | 2 | 52 | | 127.0.0.1:2379 | 8211f1d0f64f3269 | 3.0.0 | 25 kB | false | 2 | 52 |
| 127.0.0.1:22379 | 91bc3c398fb3c146 | 3.0.0-beta.0+git | 25 kB | false | 2 | 52 | | 127.0.0.1:22379 | 91bc3c398fb3c146 | 3.0.0 | 25 kB | false | 2 | 52 |
| 127.0.0.1:32379 | fd422379fda50e48 | 3.0.0-beta.0+git | 25 kB | true | 2 | 52 | | 127.0.0.1:32379 | fd422379fda50e48 | 3.0.0 | 25 kB | true | 2 | 52 |
+-----------------+------------------+------------------+---------+-----------+-----------+------------+ +-----------------+------------------+---------+---------+-----------+-----------+------------+
``` ```
### LOCK \<lockname\> ### LOCK \<lockname\>

View File

@ -25,7 +25,7 @@ import (
func NewAlarmCommand() *cobra.Command { func NewAlarmCommand() *cobra.Command {
ac := &cobra.Command{ ac := &cobra.Command{
Use: "alarm <subcommand>", Use: "alarm <subcommand>",
Short: "alarm related command", Short: "Alarm related commands",
} }
ac.AddCommand(NewAlarmDisarmCommand()) ac.AddCommand(NewAlarmDisarmCommand())
@ -37,7 +37,7 @@ func NewAlarmCommand() *cobra.Command {
func NewAlarmDisarmCommand() *cobra.Command { func NewAlarmDisarmCommand() *cobra.Command {
cmd := cobra.Command{ cmd := cobra.Command{
Use: "disarm", Use: "disarm",
Short: "disarm all alarms", Short: "Disarms all alarms",
Run: alarmDisarmCommandFunc, Run: alarmDisarmCommandFunc,
} }
return &cmd return &cmd
@ -60,7 +60,7 @@ func alarmDisarmCommandFunc(cmd *cobra.Command, args []string) {
func NewAlarmListCommand() *cobra.Command { func NewAlarmListCommand() *cobra.Command {
cmd := cobra.Command{ cmd := cobra.Command{
Use: "list", Use: "list",
Short: "list all alarms", Short: "Lists all alarms",
Run: alarmListCommandFunc, Run: alarmListCommandFunc,
} }
return &cmd return &cmd

View File

@ -24,7 +24,7 @@ import (
func NewAuthCommand() *cobra.Command { func NewAuthCommand() *cobra.Command {
ac := &cobra.Command{ ac := &cobra.Command{
Use: "auth <enable or disable>", Use: "auth <enable or disable>",
Short: "Enable or disable authentication.", Short: "Enable or disable authentication",
} }
ac.AddCommand(newAuthEnableCommand()) ac.AddCommand(newAuthEnableCommand())
@ -36,7 +36,7 @@ func NewAuthCommand() *cobra.Command {
func newAuthEnableCommand() *cobra.Command { func newAuthEnableCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "enable", Use: "enable",
Short: "enable authentication", Short: "Enables authentication",
Run: authEnableCommandFunc, Run: authEnableCommandFunc,
} }
} }
@ -60,7 +60,7 @@ func authEnableCommandFunc(cmd *cobra.Command, args []string) {
func newAuthDisableCommand() *cobra.Command { func newAuthDisableCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "disable", Use: "disable",
Short: "disable authentication", Short: "Disables authentication",
Run: authDisableCommandFunc, Run: authDisableCommandFunc,
} }
} }

View File

@ -28,10 +28,10 @@ var compactPhysical bool
func NewCompactionCommand() *cobra.Command { func NewCompactionCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "compaction <revision>", Use: "compaction <revision>",
Short: "Compaction compacts the event history in etcd.", Short: "Compacts the event history in etcd",
Run: compactionCommandFunc, Run: compactionCommandFunc,
} }
cmd.Flags().BoolVar(&compactPhysical, "physical", false, "'true' to wait for compaction to physically remove all old revisions.") cmd.Flags().BoolVar(&compactPhysical, "physical", false, "'true' to wait for compaction to physically remove all old revisions")
return cmd return cmd
} }
@ -53,7 +53,7 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
c := mustClientFromCmd(cmd) c := mustClientFromCmd(cmd)
ctx, cancel := commandCtx(cmd) ctx, cancel := commandCtx(cmd)
cerr := c.Compact(ctx, rev, opts...) _, cerr := c.Compact(ctx, rev, opts...)
cancel() cancel()
if cerr != nil { if cerr != nil {
ExitWithError(ExitError, cerr) ExitWithError(ExitError, cerr)

View File

@ -25,7 +25,7 @@ import (
func NewDefragCommand() *cobra.Command { func NewDefragCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "defrag", Use: "defrag",
Short: "defrag defragments the storage of the etcd members with given endpoints.", Short: "Defragments the storage of the etcd members with given endpoints",
Run: defragCommandFunc, Run: defragCommandFunc,
} }
} }

View File

@ -29,7 +29,7 @@ var (
func NewDelCommand() *cobra.Command { func NewDelCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "del [options] <key> [range_end]", Use: "del [options] <key> [range_end]",
Short: "Removes the specified key or range of keys [key, range_end).", Short: "Removes the specified key or range of keys [key, range_end)",
Run: delCommandFunc, Run: delCommandFunc,
} }

View File

@ -33,7 +33,7 @@ var (
func NewElectCommand() *cobra.Command { func NewElectCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "elect <election-name> [proposal]", Use: "elect <election-name> [proposal]",
Short: "elect observes and participates in leader election", Short: "Observes and participates in leader election",
Run: electCommandFunc, Run: electCommandFunc,
} }
cmd.Flags().BoolVarP(&electListen, "listen", "l", false, "observation mode") cmd.Flags().BoolVarP(&electListen, "listen", "l", false, "observation mode")

View File

@ -28,8 +28,8 @@ import (
// NewEndpointCommand returns the cobra command for "endpoint". // NewEndpointCommand returns the cobra command for "endpoint".
func NewEndpointCommand() *cobra.Command { func NewEndpointCommand() *cobra.Command {
ec := &cobra.Command{ ec := &cobra.Command{
Use: "endpoint", Use: "endpoint <subcommand>",
Short: "endpoint is used to check endpoints.", Short: "Endpoint related commands",
} }
ec.AddCommand(newEpHealthCommand()) ec.AddCommand(newEpHealthCommand())
@ -41,7 +41,7 @@ func NewEndpointCommand() *cobra.Command {
func newEpHealthCommand() *cobra.Command { func newEpHealthCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "health", Use: "health",
Short: "health checks the healthiness of endpoints specified in `--endpoints` flag", Short: "Checks the healthiness of endpoints specified in `--endpoints` flag",
Run: epHealthCommandFunc, Run: epHealthCommandFunc,
} }
return cmd return cmd
@ -50,7 +50,7 @@ func newEpHealthCommand() *cobra.Command {
func newEpStatusCommand() *cobra.Command { func newEpStatusCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "status", Use: "status",
Short: "status prints out the status of endpoints specified in `--endpoints` flag", Short: "Prints out the status of endpoints specified in `--endpoints` flag",
Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint. Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
The items in the lists are endpoint, ID, version, db size, is leader, raft term, raft index. The items in the lists are endpoint, ID, version, db size, is leader, raft term, raft index.
`, `,

View File

@ -37,18 +37,18 @@ var (
func NewGetCommand() *cobra.Command { func NewGetCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "get [options] <key> [range_end]", Use: "get [options] <key> [range_end]",
Short: "Get gets the key or a range of keys.", Short: "Gets the key or a range of keys",
Run: getCommandFunc, Run: getCommandFunc,
} }
cmd.Flags().StringVar(&getConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)") cmd.Flags().StringVar(&getConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)")
cmd.Flags().StringVar(&getSortOrder, "order", "", "order of results; ASCEND or DESCEND") cmd.Flags().StringVar(&getSortOrder, "order", "", "Order of results; ASCEND or DESCEND")
cmd.Flags().StringVar(&getSortTarget, "sort-by", "", "sort target; CREATE, KEY, MODIFY, VALUE, or VERSION") cmd.Flags().StringVar(&getSortTarget, "sort-by", "", "Sort target; CREATE, KEY, MODIFY, VALUE, or VERSION")
cmd.Flags().Int64Var(&getLimit, "limit", 0, "maximum number of results") cmd.Flags().Int64Var(&getLimit, "limit", 0, "Maximum number of results")
cmd.Flags().BoolVar(&getPrefix, "prefix", false, "get keys with matching prefix") cmd.Flags().BoolVar(&getPrefix, "prefix", false, "Get keys with matching prefix")
cmd.Flags().BoolVar(&getFromKey, "from-key", false, "get keys that are greater than or equal to the given key") cmd.Flags().BoolVar(&getFromKey, "from-key", false, "Get keys that are greater than or equal to the given key")
cmd.Flags().Int64Var(&getRev, "rev", 0, "specify the kv revision") cmd.Flags().Int64Var(&getRev, "rev", 0, "Specify the kv revision")
cmd.Flags().BoolVar(&getKeysOnly, "keys-only", false, "get only the keys") cmd.Flags().BoolVar(&getKeysOnly, "keys-only", false, "Get only the keys")
return cmd return cmd
} }

View File

@ -26,8 +26,8 @@ import (
// NewLeaseCommand returns the cobra command for "lease". // NewLeaseCommand returns the cobra command for "lease".
func NewLeaseCommand() *cobra.Command { func NewLeaseCommand() *cobra.Command {
lc := &cobra.Command{ lc := &cobra.Command{
Use: "lease", Use: "lease <subcommand>",
Short: "lease is used to manage leases.", Short: "Lease related commands",
} }
lc.AddCommand(NewLeaseGrantCommand()) lc.AddCommand(NewLeaseGrantCommand())
@ -41,7 +41,7 @@ func NewLeaseCommand() *cobra.Command {
func NewLeaseGrantCommand() *cobra.Command { func NewLeaseGrantCommand() *cobra.Command {
lc := &cobra.Command{ lc := &cobra.Command{
Use: "grant <ttl>", Use: "grant <ttl>",
Short: "grant is used to create leases.", Short: "Creates leases",
Run: leaseGrantCommandFunc, Run: leaseGrantCommandFunc,
} }
@ -73,7 +73,7 @@ func leaseGrantCommandFunc(cmd *cobra.Command, args []string) {
func NewLeaseRevokeCommand() *cobra.Command { func NewLeaseRevokeCommand() *cobra.Command {
lc := &cobra.Command{ lc := &cobra.Command{
Use: "revoke <leaseID>", Use: "revoke <leaseID>",
Short: "revoke is used to revoke leases.", Short: "Revokes leases",
Run: leaseRevokeCommandFunc, Run: leaseRevokeCommandFunc,
} }
@ -105,7 +105,7 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
func NewLeaseKeepAliveCommand() *cobra.Command { func NewLeaseKeepAliveCommand() *cobra.Command {
lc := &cobra.Command{ lc := &cobra.Command{
Use: "keep-alive <leaseID>", Use: "keep-alive <leaseID>",
Short: "keep-alive is used to keep leases alive.", Short: "Keeps leases alive (renew)",
Run: leaseKeepAliveCommandFunc, Run: leaseKeepAliveCommandFunc,
} }

View File

@ -29,7 +29,7 @@ import (
func NewLockCommand() *cobra.Command { func NewLockCommand() *cobra.Command {
c := &cobra.Command{ c := &cobra.Command{
Use: "lock <lockname>", Use: "lock <lockname>",
Short: "lock acquires a named lock", Short: "Acquires a named lock",
Run: lockCommandFunc, Run: lockCommandFunc,
} }
return c return c

View File

@ -40,17 +40,17 @@ var (
func NewMakeMirrorCommand() *cobra.Command { func NewMakeMirrorCommand() *cobra.Command {
c := &cobra.Command{ c := &cobra.Command{
Use: "make-mirror [options] <destination>", Use: "make-mirror [options] <destination>",
Short: "make-mirror makes a mirror at the destination etcd cluster", Short: "Makes a mirror at the destination etcd cluster",
Run: makeMirrorCommandFunc, Run: makeMirrorCommandFunc,
} }
c.Flags().StringVar(&mmprefix, "prefix", "", "the key-value prefix to mirror") c.Flags().StringVar(&mmprefix, "prefix", "", "Key-value prefix to mirror")
// TODO: add dest-prefix to mirror a prefix to a different prefix in the destination cluster? // TODO: add dest-prefix to mirror a prefix to a different prefix in the destination cluster?
c.Flags().StringVar(&mmcert, "dest-cert", "", "identify secure client using this TLS certificate file for the destination cluster") c.Flags().StringVar(&mmcert, "dest-cert", "", "Identify secure client using this TLS certificate file for the destination cluster")
c.Flags().StringVar(&mmkey, "dest-key", "", "identify secure client using this TLS key file") c.Flags().StringVar(&mmkey, "dest-key", "", "Identify secure client using this TLS key file")
c.Flags().StringVar(&mmcacert, "dest-cacert", "", "verify certificates of TLS enabled secure servers using this CA bundle") c.Flags().StringVar(&mmcacert, "dest-cacert", "", "Verify certificates of TLS enabled secure servers using this CA bundle")
// TODO: secure by default when etcd enables secure gRPC by default. // TODO: secure by default when etcd enables secure gRPC by default.
c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "disable transport security for client connections") c.Flags().BoolVar(&mminsecureTr, "dest-insecure-transport", true, "Disable transport security for client connections")
return c return c
} }

View File

@ -27,8 +27,8 @@ var memberPeerURLs string
// NewMemberCommand returns the cobra command for "member". // NewMemberCommand returns the cobra command for "member".
func NewMemberCommand() *cobra.Command { func NewMemberCommand() *cobra.Command {
mc := &cobra.Command{ mc := &cobra.Command{
Use: "member", Use: "member <subcommand>",
Short: "member is used to manage membership in an etcd cluster.", Short: "Membership related commands",
} }
mc.AddCommand(NewMemberAddCommand()) mc.AddCommand(NewMemberAddCommand())
@ -43,7 +43,7 @@ func NewMemberCommand() *cobra.Command {
func NewMemberAddCommand() *cobra.Command { func NewMemberAddCommand() *cobra.Command {
cc := &cobra.Command{ cc := &cobra.Command{
Use: "add <memberName>", Use: "add <memberName>",
Short: "add is used to add a member into the cluster", Short: "Adds a member into the cluster",
Run: memberAddCommandFunc, Run: memberAddCommandFunc,
} }
@ -57,7 +57,7 @@ func NewMemberAddCommand() *cobra.Command {
func NewMemberRemoveCommand() *cobra.Command { func NewMemberRemoveCommand() *cobra.Command {
cc := &cobra.Command{ cc := &cobra.Command{
Use: "remove <memberID>", Use: "remove <memberID>",
Short: "remove is used to remove a member from the cluster", Short: "Removes a member from the cluster",
Run: memberRemoveCommandFunc, Run: memberRemoveCommandFunc,
} }
@ -69,7 +69,7 @@ func NewMemberRemoveCommand() *cobra.Command {
func NewMemberUpdateCommand() *cobra.Command { func NewMemberUpdateCommand() *cobra.Command {
cc := &cobra.Command{ cc := &cobra.Command{
Use: "update <memberID>", Use: "update <memberID>",
Short: "update is used to update a member in the cluster", Short: "Updates a member in the cluster",
Run: memberUpdateCommandFunc, Run: memberUpdateCommandFunc,
} }
@ -83,7 +83,7 @@ func NewMemberUpdateCommand() *cobra.Command {
func NewMemberListCommand() *cobra.Command { func NewMemberListCommand() *cobra.Command {
cc := &cobra.Command{ cc := &cobra.Command{
Use: "list", Use: "list",
Short: "list is used to list all members in the cluster", Short: "Lists all members in the cluster",
Long: `When --write-out is set to simple, this command prints out comma-separated member lists for each endpoint. Long: `When --write-out is set to simple, this command prints out comma-separated member lists for each endpoint.
The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs. The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs.
`, `,

View File

@ -51,13 +51,13 @@ var (
func NewMigrateCommand() *cobra.Command { func NewMigrateCommand() *cobra.Command {
mc := &cobra.Command{ mc := &cobra.Command{
Use: "migrate", Use: "migrate",
Short: "migrates keys in a v2 store to a mvcc store", Short: "Migrates keys in a v2 store to a mvcc store",
Run: migrateCommandFunc, Run: migrateCommandFunc,
} }
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory.") mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory.") mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program.") mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
return mc return mc
} }

View File

@ -31,9 +31,9 @@ var (
func NewPutCommand() *cobra.Command { func NewPutCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "put [options] <key> <value> (<value> can also be given from stdin)", Use: "put [options] <key> <value> (<value> can also be given from stdin)",
Short: "Put puts the given key into the store.", Short: "Puts the given key into the store",
Long: ` Long: `
Put puts the given key into the store. Puts the given key into the store.
When <value> begins with '-', <value> is interpreted as a flag. When <value> begins with '-', <value> is interpreted as a flag.
Insert '--' for workaround: Insert '--' for workaround:

View File

@ -26,7 +26,7 @@ import (
func NewRoleCommand() *cobra.Command { func NewRoleCommand() *cobra.Command {
ac := &cobra.Command{ ac := &cobra.Command{
Use: "role <subcommand>", Use: "role <subcommand>",
Short: "role related command", Short: "Role related commands",
} }
ac.AddCommand(newRoleAddCommand()) ac.AddCommand(newRoleAddCommand())
@ -42,7 +42,7 @@ func NewRoleCommand() *cobra.Command {
func newRoleAddCommand() *cobra.Command { func newRoleAddCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "add <role name>", Use: "add <role name>",
Short: "add a new role", Short: "Adds a new role",
Run: roleAddCommandFunc, Run: roleAddCommandFunc,
} }
} }
@ -50,7 +50,7 @@ func newRoleAddCommand() *cobra.Command {
func newRoleDeleteCommand() *cobra.Command { func newRoleDeleteCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "delete <role name>", Use: "delete <role name>",
Short: "delete a role", Short: "Deletes a role",
Run: roleDeleteCommandFunc, Run: roleDeleteCommandFunc,
} }
} }
@ -58,7 +58,7 @@ func newRoleDeleteCommand() *cobra.Command {
func newRoleGetCommand() *cobra.Command { func newRoleGetCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "get <role name>", Use: "get <role name>",
Short: "get detailed information of a role", Short: "Gets detailed information of a role",
Run: roleGetCommandFunc, Run: roleGetCommandFunc,
} }
} }
@ -66,7 +66,7 @@ func newRoleGetCommand() *cobra.Command {
func newRoleListCommand() *cobra.Command { func newRoleListCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "list", Use: "list",
Short: "list up all roles", Short: "Lists all roles",
Run: roleListCommandFunc, Run: roleListCommandFunc,
} }
} }
@ -74,7 +74,7 @@ func newRoleListCommand() *cobra.Command {
func newRoleGrantPermissionCommand() *cobra.Command { func newRoleGrantPermissionCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "grant-permission <role name> <permission type> <key> [endkey]", Use: "grant-permission <role name> <permission type> <key> [endkey]",
Short: "grant a key to a role", Short: "Grants a key to a role",
Run: roleGrantPermissionCommandFunc, Run: roleGrantPermissionCommandFunc,
} }
} }
@ -82,7 +82,7 @@ func newRoleGrantPermissionCommand() *cobra.Command {
func newRoleRevokePermissionCommand() *cobra.Command { func newRoleRevokePermissionCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "revoke-permission <role name> <key> [endkey]", Use: "revoke-permission <role name> <key> [endkey]",
Short: "revoke a key from a role", Short: "Revokes a key from a role",
Run: roleRevokePermissionCommandFunc, Run: roleRevokePermissionCommandFunc,
} }
} }

View File

@ -58,8 +58,8 @@ var (
// NewSnapshotCommand returns the cobra command for "snapshot". // NewSnapshotCommand returns the cobra command for "snapshot".
func NewSnapshotCommand() *cobra.Command { func NewSnapshotCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "snapshot", Use: "snapshot <subcommand>",
Short: "snapshot manages etcd node snapshots.", Short: "Manages etcd node snapshots",
} }
cmd.AddCommand(NewSnapshotSaveCommand()) cmd.AddCommand(NewSnapshotSaveCommand())
cmd.AddCommand(NewSnapshotRestoreCommand()) cmd.AddCommand(NewSnapshotRestoreCommand())
@ -70,7 +70,7 @@ func NewSnapshotCommand() *cobra.Command {
func NewSnapshotSaveCommand() *cobra.Command { func NewSnapshotSaveCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "save <filename>", Use: "save <filename>",
Short: "save stores an etcd node backend snapshot to a given file.", Short: "Stores an etcd node backend snapshot to a given file",
Run: snapshotSaveCommandFunc, Run: snapshotSaveCommandFunc,
} }
} }
@ -78,7 +78,7 @@ func NewSnapshotSaveCommand() *cobra.Command {
func newSnapshotStatusCommand() *cobra.Command { func newSnapshotStatusCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "status <filename>", Use: "status <filename>",
Short: "status gets backend snapshot status of a given file.", Short: "Gets backend snapshot status of a given file",
Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint. Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
The items in the lists are hash, revision, total keys, total size. The items in the lists are hash, revision, total keys, total size.
`, `,
@ -89,15 +89,15 @@ The items in the lists are hash, revision, total keys, total size.
func NewSnapshotRestoreCommand() *cobra.Command { func NewSnapshotRestoreCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "restore <filename>", Use: "restore <filename>",
Short: "restore an etcd member snapshot to an etcd directory", Short: "Restores an etcd member snapshot to an etcd directory",
Run: snapshotRestoreCommandFunc, Run: snapshotRestoreCommandFunc,
} }
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.") cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.") cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.") cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.") cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.") cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory).") cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
return cmd return cmd
} }

View File

@ -34,10 +34,10 @@ var (
func NewTxnCommand() *cobra.Command { func NewTxnCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "txn [options]", Use: "txn [options]",
Short: "Txn processes all the requests in one transaction.", Short: "Txn processes all the requests in one transaction",
Run: txnCommandFunc, Run: txnCommandFunc,
} }
cmd.Flags().BoolVarP(&txnInteractive, "interactive", "i", false, "input transaction in interactive mode") cmd.Flags().BoolVarP(&txnInteractive, "interactive", "i", false, "Input transaction in interactive mode")
return cmd return cmd
} }

View File

@ -31,7 +31,7 @@ var (
func NewUserCommand() *cobra.Command { func NewUserCommand() *cobra.Command {
ac := &cobra.Command{ ac := &cobra.Command{
Use: "user <subcommand>", Use: "user <subcommand>",
Short: "user related command", Short: "User related commands",
} }
ac.AddCommand(newUserAddCommand()) ac.AddCommand(newUserAddCommand())
@ -52,11 +52,11 @@ var (
func newUserAddCommand() *cobra.Command { func newUserAddCommand() *cobra.Command {
cmd := cobra.Command{ cmd := cobra.Command{
Use: "add <user name>", Use: "add <user name>",
Short: "add a new user", Short: "Adds a new user",
Run: userAddCommandFunc, Run: userAddCommandFunc,
} }
cmd.Flags().BoolVar(&passwordInteractive, "interactive", true, "read password from stdin instead of interactive terminal") cmd.Flags().BoolVar(&passwordInteractive, "interactive", true, "Read password from stdin instead of interactive terminal")
return &cmd return &cmd
} }
@ -64,7 +64,7 @@ func newUserAddCommand() *cobra.Command {
func newUserDeleteCommand() *cobra.Command { func newUserDeleteCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "delete <user name>", Use: "delete <user name>",
Short: "delete a user", Short: "Deletes a user",
Run: userDeleteCommandFunc, Run: userDeleteCommandFunc,
} }
} }
@ -72,11 +72,11 @@ func newUserDeleteCommand() *cobra.Command {
func newUserGetCommand() *cobra.Command { func newUserGetCommand() *cobra.Command {
cmd := cobra.Command{ cmd := cobra.Command{
Use: "get <user name>", Use: "get <user name>",
Short: "get detailed information of a user", Short: "Gets detailed information of a user",
Run: userGetCommandFunc, Run: userGetCommandFunc,
} }
cmd.Flags().BoolVar(&userShowDetail, "detail", false, "show permissions of roles granted to the user") cmd.Flags().BoolVar(&userShowDetail, "detail", false, "Show permissions of roles granted to the user")
return &cmd return &cmd
} }
@ -84,7 +84,7 @@ func newUserGetCommand() *cobra.Command {
func newUserListCommand() *cobra.Command { func newUserListCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "list", Use: "list",
Short: "list up all users", Short: "Lists all users",
Run: userListCommandFunc, Run: userListCommandFunc,
} }
} }
@ -92,11 +92,11 @@ func newUserListCommand() *cobra.Command {
func newUserChangePasswordCommand() *cobra.Command { func newUserChangePasswordCommand() *cobra.Command {
cmd := cobra.Command{ cmd := cobra.Command{
Use: "passwd <user name>", Use: "passwd <user name>",
Short: "change password of user", Short: "Changes password of user",
Run: userChangePasswordCommandFunc, Run: userChangePasswordCommandFunc,
} }
cmd.Flags().BoolVar(&passwordInteractive, "interactive", true, "if true, read password from stdin instead of interactive terminal") cmd.Flags().BoolVar(&passwordInteractive, "interactive", true, "If true, read password from stdin instead of interactive terminal")
return &cmd return &cmd
} }
@ -104,7 +104,7 @@ func newUserChangePasswordCommand() *cobra.Command {
func newUserGrantRoleCommand() *cobra.Command { func newUserGrantRoleCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "grant-role <user name> <role name>", Use: "grant-role <user name> <role name>",
Short: "grant a role to a user", Short: "Grants a role to a user",
Run: userGrantRoleCommandFunc, Run: userGrantRoleCommandFunc,
} }
} }
@ -112,7 +112,7 @@ func newUserGrantRoleCommand() *cobra.Command {
func newUserRevokeRoleCommand() *cobra.Command { func newUserRevokeRoleCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "revoke-role <user name> <role name>", Use: "revoke-role <user name> <role name>",
Short: "revoke a role from a user", Short: "Revokes a role from a user",
Run: userRevokeRoleCommandFunc, Run: userRevokeRoleCommandFunc,
} }
} }

View File

@ -26,7 +26,7 @@ import (
func NewVersionCommand() *cobra.Command { func NewVersionCommand() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "version", Use: "version",
Short: "Print the version of etcdctl.", Short: "Prints the version of etcdctl",
Run: versionCommandFunc, Run: versionCommandFunc,
} }
} }

View File

@ -35,13 +35,13 @@ var (
func NewWatchCommand() *cobra.Command { func NewWatchCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "watch [options] [key or prefix] [range_end]", Use: "watch [options] [key or prefix] [range_end]",
Short: "Watch watches events stream on keys or prefixes.", Short: "Watches events stream on keys or prefixes",
Run: watchCommandFunc, Run: watchCommandFunc,
} }
cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "interactive mode") cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "watch on a prefix if prefix is set") cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
cmd.Flags().Int64Var(&watchRev, "rev", 0, "revision to start watching") cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
return cmd return cmd
} }

View File

@ -43,7 +43,7 @@ var (
) )
func init() { func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"}, "gRPC endpoints") rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (simple, json, etc..)") rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (simple, json, etc..)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings") rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")

View File

@ -194,9 +194,16 @@ func startEtcdOrProxyV2() {
// startEtcd launches the etcd server and HTTP handlers for client/server communication. // startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd(cfg *config) (<-chan struct{}, error) { func startEtcd(cfg *config) (<-chan struct{}, error) {
urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd") var (
if err != nil { urlsmap types.URLsMap
return nil, fmt.Errorf("error setting up initial cluster: %v", err) token string
err error
)
if !isMemberInitialized(cfg) {
urlsmap, token, err = getPeerURLsMapAndToken(cfg, "etcd")
if err != nil {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
}
} }
if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() { if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() {

30
etcdmain/util.go Normal file
View File

@ -0,0 +1,30 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
"path"
"github.com/coreos/etcd/wal"
)
func isMemberInitialized(cfg *config) bool {
waldir := cfg.WalDir
if waldir == "" {
waldir = path.Join(cfg.Dir, "member", "wal")
}
return wal.Exist(waldir)
}

View File

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
) )
@ -71,7 +72,7 @@ func runCapabilityLoop(s *etcdserver.EtcdServer) {
enableMapMu.Lock() enableMapMu.Lock()
enabledMap = capabilityMaps[pv.String()] enabledMap = capabilityMaps[pv.String()]
enableMapMu.Unlock() enableMapMu.Unlock()
plog.Infof("enabled capabilities for version %s", pv) plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
} }
} }

View File

@ -26,15 +26,15 @@ func TestConvert(t *testing.T) {
e2 := ErrGRPCEmptyKey e2 := ErrGRPCEmptyKey
e3 := ErrEmptyKey e3 := ErrEmptyKey
if e1 != e2 { if e1.Error() != e2.Error() {
t.Fatalf("expected 'true', got %T != %T", e1, e2) t.Fatalf("expected %q == %q", e1.Error(), e2.Error())
} }
if grpc.Code(e1) != e3.(EtcdError).Code() { if grpc.Code(e1) != e3.(EtcdError).Code() {
t.Fatalf("expected them to be equal, got %v / %v", grpc.Code(e1), e3.(EtcdError).Code()) t.Fatalf("expected them to be equal, got %v / %v", grpc.Code(e1), e3.(EtcdError).Code())
} }
if e1 == e3 { if e1.Error() == e3.Error() {
t.Fatalf("expected 'false', got %T == %T", e1, e3) t.Fatalf("expected %q != %q", e1.Error(), e3.Error())
} }
if grpc.Code(e2) != e3.(EtcdError).Code() { if grpc.Code(e2) != e3.(EtcdError).Code() {
t.Fatalf("expected them to be equal, got %v / %v", grpc.Code(e2), e3.(EtcdError).Code()) t.Fatalf("expected them to be equal, got %v / %v", grpc.Code(e2), e3.(EtcdError).Code())

View File

@ -86,11 +86,11 @@ type serverWatchStream struct {
watchStream mvcc.WatchStream watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse ctrlStream chan *pb.WatchResponse
// mu protects progress, prevKV
mu sync.Mutex
// progress tracks the watchID that stream might need to send // progress tracks the watchID that stream might need to send
// progress to. // progress to.
progress map[mvcc.WatchID]bool progress map[mvcc.WatchID]bool
// mu protects progress
mu sync.Mutex
// closec indicates the stream is closed. // closec indicates the stream is closed.
closec chan struct{} closec chan struct{}
@ -171,7 +171,9 @@ func (sws *serverWatchStream) recvLoop() error {
} }
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify { if id != -1 && creq.ProgressNotify {
sws.mu.Lock()
sws.progress[id] = true sws.progress[id] = true
sws.mu.Unlock()
} }
wr := &pb.WatchResponse{ wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev), Header: sws.newResponseHeader(wsrev),
@ -199,9 +201,11 @@ func (sws *serverWatchStream) recvLoop() error {
sws.mu.Unlock() sws.mu.Unlock()
} }
} }
// TODO: do we need to return error back to client?
default: default:
panic("not implemented") // we probably should not shutdown the entire stream when
// receive an valid command.
// so just do nothing instead.
continue
} }
} }
} }
@ -296,12 +300,14 @@ func (sws *serverWatchStream) sendLoop() {
delete(pending, wid) delete(pending, wid)
} }
case <-progressTicker.C: case <-progressTicker.C:
sws.mu.Lock()
for id, ok := range sws.progress { for id, ok := range sws.progress {
if ok { if ok {
sws.watchStream.RequestProgress(id) sws.watchStream.RequestProgress(id)
} }
sws.progress[id] = true sws.progress[id] = true
} }
sws.mu.Unlock()
case <-sws.closec: case <-sws.closec:
return return
} }

View File

@ -357,7 +357,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(be) cl.SetBackend(be)
cl.Recover() cl.Recover()
if cl.Version() != nil && cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
} }
@ -1170,8 +1170,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
} }
plog.Panicf("unexpected create snapshot error %v", err) plog.Panicf("unexpected create snapshot error %v", err)
} }
// commit v3 storage because WAL file before snapshot index // commit kv to write metadata (for example: consistent index) to disk.
// could be removed after SaveSnap.
s.KV().Commit() s.KV().Commit()
// SaveSnap saves the snapshot and releases the locked wal files // SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index. // to the snapshot index.

View File

@ -845,7 +845,7 @@ func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}}) s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorder() st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorder("") p := mockstorage.NewStorageRecorderStream("")
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{}, Cfg: &ServerConfig{},
r: raftNode{ r: raftNode{
@ -869,7 +869,7 @@ func TestSnapshot(t *testing.T) {
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) { if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
t.Errorf("action = %s, want SaveNoCopy", gaction[1]) t.Errorf("action = %s, want SaveNoCopy", gaction[1])
} }
gaction = p.Action() gaction, _ = p.Wait(1)
if len(gaction) != 1 { if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction)) t.Fatalf("len(action) = %d, want 1", len(gaction))
} }

View File

@ -39,6 +39,8 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
plog.Panicf("store save should never fail: %v", err) plog.Panicf("store save should never fail: %v", err)
} }
// commit kv to write metadata(for example: consistent index).
s.KV().Commit()
dbsnap := s.be.Snapshot() dbsnap := s.be.Snapshot()
// get a snapshot of v3 KV as readCloser // get a snapshot of v3 KV as readCloser
rc := newSnapshotReaderCloser(dbsnap) rc := newSnapshotReaderCloser(dbsnap)

View File

@ -8,6 +8,7 @@ all: cfssl ca req
cfssl: cfssl:
go get -u -tags nopkcs11 github.com/cloudflare/cfssl/cmd/cfssl go get -u -tags nopkcs11 github.com/cloudflare/cfssl/cmd/cfssl
go get -u github.com/cloudflare/cfssl/cmd/cfssljson go get -u github.com/cloudflare/cfssl/cmd/cfssljson
go get -u github.com/mattn/goreman
ca: ca:
mkdir -p certs mkdir -p certs

View File

@ -54,8 +54,8 @@ const (
requestTimeout = 20 * time.Second requestTimeout = 20 * time.Second
basePort = 21000 basePort = 21000
urlScheme = "unix" UrlScheme = "unix"
urlSchemeTLS = "unixs" UrlSchemeTLS = "unixs"
) )
var ( var (
@ -96,9 +96,9 @@ func init() {
func schemeFromTLSInfo(tls *transport.TLSInfo) string { func schemeFromTLSInfo(tls *transport.TLSInfo) string {
if tls == nil { if tls == nil {
return urlScheme return UrlScheme
} }
return urlSchemeTLS return UrlSchemeTLS
} }
func (c *cluster) fillClusterForMembers() error { func (c *cluster) fillClusterForMembers() error {
@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) {
} }
func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := ma.Add(ctx, peerURL); err != nil { if _, err := ma.Add(ctx, peerURL); err != nil {
@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) {
func (c *cluster) RemoveMember(t *testing.T, id uint64) { func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster // send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil { if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) {
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() { for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
func newLocalListener(t *testing.T) net.Listener { func newLocalListener(t *testing.T) net.Listener {
c := atomic.AddInt64(&localListenCount, 1) c := atomic.AddInt64(&localListenCount, 1)
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid()) addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
return newListenerWithAddr(t, addr) return NewListenerWithAddr(t, addr)
} }
func newListenerWithAddr(t *testing.T, addr string) net.Listener { func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
l, err := transport.NewUnixListener(addr) l, err := transport.NewUnixListener(addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -614,7 +614,7 @@ func (m *member) Launch() error {
} }
func (m *member) WaitOK(t *testing.T) { func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error {
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr) plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
newPeerListeners := make([]net.Listener, 0) newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners { for _, ln := range m.PeerListeners {
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
} }
m.PeerListeners = newPeerListeners m.PeerListeners = newPeerListeners
newClientListeners := make([]net.Listener, 0) newClientListeners := make([]net.Listener, 0)
for _, ln := range m.ClientListeners { for _, ln := range m.ClientListeners {
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
} }
m.ClientListeners = newClientListeners m.ClientListeners = newClientListeners
@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) {
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
} }
func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
cfgtls := transport.TLSInfo{} cfgtls := transport.TLSInfo{}
if tls != nil { if tls != nil {
cfgtls = *tls cfgtls = *tls

View File

@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil) dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
dc.Launch(t) dc.Launch(t)
defer dc.Terminate(t) defer dc.Terminate(t)
// init discovery token space // init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil) dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc) dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
func TestForceNewCluster(t *testing.T) { func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3) c := NewCluster(t, 3)
c.Launch(t) c.Launch(t)
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kapi.Create(ctx, "/foo", "bar") resp, err := kapi.Create(ctx, "/foo", "bar")
@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) {
c.waitLeader(t, c.Members[:1]) c.waitLeader(t, c.Members[:1])
// use new http client to init new connection // use new http client to init new connection
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi = client.NewKeysAPI(cc) kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress // ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) {
c.Members[1].Stop(t) c.Members[1].Stop(t)
// send remove member-1 request to the cluster. // send remove member-1 request to the cluster.
cc := mustNewHTTPClient(t, c.URLs(), nil) cc := MustNewHTTPClient(t, c.URLs(), nil)
ma := client.NewMembersAPI(cc) ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
// the proposal is not committed because member 1 is stopped, but the // the proposal is not committed because member 1 is stopped, but the
@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) {
c.waitLeader(t, c.Members) c.waitLeader(t, c.Members)
// try to participate in cluster // try to participate in cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil { if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) {
// a random key first, and check the new key could be got from all client urls // a random key first, and check the new key could be got from all client urls
// of the cluster. // of the cluster.
func clusterMustProgress(t *testing.T, membs []*member) { func clusterMustProgress(t *testing.T, membs []*member) {
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", rand.Int()) key := fmt.Sprintf("foo%d", rand.Int())
@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
for i, m := range membs { for i, m := range membs {
u := m.URL() u := m.URL()
mcc := mustNewHTTPClient(t, []string{u}, nil) mcc := MustNewHTTPClient(t, []string{u}, nil)
mkapi := client.NewKeysAPI(mcc) mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {

View File

@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
resps := make([]*client.Response, 120) resps := make([]*client.Response, 120)
var err error var err error
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil) cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)
@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
m.WaitOK(t) m.WaitOK(t)
for i := 0; i < 120; i++ { for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil) cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)

View File

@ -191,7 +191,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
} }
_, err := kvc.Txn(context.Background(), txn) _, err := kvc.Txn(context.Background(), txn)
if err != rpctypes.ErrGRPCTooManyOps { if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps) t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
} }
} }
@ -257,7 +257,7 @@ func TestV3TxnDuplicateKeys(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
txn := &pb.TxnRequest{Success: tt.txnSuccess} txn := &pb.TxnRequest{Success: tt.txnSuccess}
_, err := kvc.Txn(context.Background(), txn) _, err := kvc.Txn(context.Background(), txn)
if err != tt.werr { if !eqErrGRPC(err, tt.werr) {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
} }
} }
@ -468,8 +468,8 @@ func TestV3DeleteRange(t *testing.T) {
} }
} }
// TestV3TxnInvaildRange tests txn // TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
func TestV3TxnInvaildRange(t *testing.T) { func TestV3TxnInvalidRange(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3}) clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
@ -500,14 +500,14 @@ func TestV3TxnInvaildRange(t *testing.T) {
Request: &pb.RequestOp_RequestRange{ Request: &pb.RequestOp_RequestRange{
RequestRange: rreq}}) RequestRange: rreq}})
if _, err := kvc.Txn(context.TODO(), txn); err != rpctypes.ErrGRPCFutureRev { if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev) t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
} }
// compacted rev // compacted rev
tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange) tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
tv.RequestRange.Revision = 1 tv.RequestRange.Revision = 1
if _, err := kvc.Txn(context.TODO(), txn); err != rpctypes.ErrGRPCCompacted { if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted) t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
} }
} }
@ -525,7 +525,7 @@ func TestV3TooLargeRequest(t *testing.T) {
preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV} preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
_, err := kvc.Put(context.Background(), preq) _, err := kvc.Put(context.Background(), preq)
if err != rpctypes.ErrGRPCRequestTooLarge { if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge) t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
} }
} }
@ -572,14 +572,14 @@ func TestV3StorageQuotaAPI(t *testing.T) {
// test small put that fits in quota // test small put that fits in quota
smallbuf := make([]byte, 512) smallbuf := make([]byte, 512)
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// test big put // test big put
bigbuf := make([]byte, 64*1024) bigbuf := make([]byte, 64*1024)
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
if err == nil || err != rpctypes.ErrGRPCNoSpace { if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
} }
@ -595,7 +595,7 @@ func TestV3StorageQuotaAPI(t *testing.T) {
txnreq := &pb.TxnRequest{} txnreq := &pb.TxnRequest{}
txnreq.Success = append(txnreq.Success, puttxn) txnreq.Success = append(txnreq.Success, puttxn)
_, txnerr := kvc.Txn(context.TODO(), txnreq) _, txnerr := kvc.Txn(context.TODO(), txnreq)
if txnerr == nil || err != rpctypes.ErrGRPCNoSpace { if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
} }
} }
@ -619,7 +619,7 @@ func TestV3StorageQuotaApply(t *testing.T) {
// test small put still works // test small put still works
smallbuf := make([]byte, 1024) smallbuf := make([]byte, 1024)
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}, grpc.FailFast(false))
if serr != nil { if serr != nil {
t.Fatal(serr) t.Fatal(serr)
} }
@ -694,7 +694,7 @@ func TestV3AlarmDeactivate(t *testing.T) {
key := []byte("abc") key := []byte("abc")
smallbuf := make([]byte, 512) smallbuf := make([]byte, 512)
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
if err == nil && err != rpctypes.ErrGRPCNoSpace { if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
} }
@ -1048,3 +1048,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
t.Errorf("err = %v, want nil", err) t.Errorf("err = %v, want nil", err)
} }
} }
func eqErrGRPC(err1 error, err2 error) bool {
return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
}

View File

@ -106,7 +106,7 @@ func TestV3LeaseGrantByID(t *testing.T) {
lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant( lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
context.TODO(), context.TODO(),
&pb.LeaseGrantRequest{ID: 1, TTL: 1}) &pb.LeaseGrantRequest{ID: 1, TTL: 1})
if err != rpctypes.ErrGRPCLeaseExist { if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
t.Error(err) t.Error(err)
} }
@ -242,7 +242,7 @@ func TestV3PutOnNonExistLease(t *testing.T) {
badLeaseID := int64(0x12345678) badLeaseID := int64(0x12345678)
putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: badLeaseID} putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: badLeaseID}
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr) _, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
if err != rpctypes.ErrGRPCLeaseNotFound { if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseNotFound) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted) t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
} }
} }
@ -424,7 +424,7 @@ func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
return false return false
} }
if err == rpctypes.ErrGRPCLeaseExist { if eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
return true return true
} }
t.Fatalf("unexpecter error %v", err) t.Fatalf("unexpecter error %v", err)

View File

@ -55,7 +55,7 @@ const (
type Backend interface { type Backend interface {
BatchTx() BatchTx BatchTx() BatchTx
Snapshot() Snapshot Snapshot() Snapshot
Hash() (uint32, error) Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend. // Size returns the current size of the backend.
Size() int64 Size() int64
Defrag() error Defrag() error
@ -144,7 +144,12 @@ func (b *backend) Snapshot() Snapshot {
return &snapshot{tx} return &snapshot{tx}
} }
func (b *backend) Hash() (uint32, error) { type IgnoreKey struct {
Bucket string
Key string
}
func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
b.mu.RLock() b.mu.RLock()
@ -158,8 +163,11 @@ func (b *backend) Hash() (uint32, error) {
} }
h.Write(next) h.Write(next)
b.ForEach(func(k, v []byte) error { b.ForEach(func(k, v []byte) error {
h.Write(k) bk := IgnoreKey{Bucket: string(next), Key: string(k)}
h.Write(v) if _, ok := ignores[bk]; !ok {
h.Write(k)
h.Write(v)
}
return nil return nil
}) })
} }

View File

@ -141,7 +141,7 @@ func TestBackendDefrag(t *testing.T) {
size := b.Size() size := b.Size()
// shrink and check hash // shrink and check hash
oh, err := b.Hash() oh, err := b.Hash(nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -151,7 +151,7 @@ func TestBackendDefrag(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
nh, err := b.Hash() nh, err := b.Hash(nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -320,7 +320,14 @@ func (s *store) Hash() (uint32, int64, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
h, err := s.b.Hash() // ignore hash consistent index field for now.
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
ignores := make(map[backend.IgnoreKey]struct{})
bk := backend.IgnoreKey{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}
ignores[bk] = struct{}{}
h, err := s.b.Hash(ignores)
rev := s.currentRev.main rev := s.currentRev.main
return h, rev, err return h, rev, err
} }

View File

@ -593,13 +593,13 @@ type fakeBackend struct {
tx *fakeBatchTx tx *fakeBatchTx
} }
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) Hash() (uint32, error) { return 0, nil } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil } func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct { type indexGetResp struct {
rev revision rev revision

View File

@ -75,7 +75,6 @@ type Progress struct {
func (pr *Progress) resetState(state ProgressStateType) { func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false pr.Paused = false
pr.RecentActive = false
pr.PendingSnapshot = 0 pr.PendingSnapshot = 0
pr.State = state pr.State = state
pr.ins.reset() pr.ins.reset()

14
test
View File

@ -57,13 +57,25 @@ function unit_tests {
} }
function integration_tests { function integration_tests {
echo "Running integration tests..." if [ "$RELEASE_TEST" = "y" ]; then
UPGRADE_VER=$(git tag -l | tail -1)
if [ -n "$MANUAL_VER" ]; then
# in case, we need to test against different version
UPGRADE_VER=$MANUAL_VER
fi
echo "Running release upgrade tests with" etcd $UPGRADE_VER
curl -L https://github.com/coreos/etcd/releases/download/$UPGRADE_VER/etcd-$UPGRADE_VER-linux-amd64.tar.gz -o /tmp/etcd-$UPGRADE_VER-linux-amd64.tar.gz
tar xzvf /tmp/etcd-$UPGRADE_VER-linux-amd64.tar.gz -C /tmp/ --strip-components=1
mv /tmp/etcd ./bin/etcd-last-release
fi;
go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e & go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e &
e2epid="$!" e2epid="$!"
go test -timeout 15m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration & go test -timeout 15m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration &
intpid="$!" intpid="$!"
wait $e2epid wait $e2epid
wait $intpid wait $intpid
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST} go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}

View File

@ -143,7 +143,7 @@ func compactKV(clients []*v3.Client) {
revToCompact := max(0, curRev-compactIndexDelta) revToCompact := max(0, curRev-compactIndexDelta)
for _, c := range clients { for _, c := range clients {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := c.KV.Compact(ctx, revToCompact) _, err := c.KV.Compact(ctx, revToCompact)
cancel() cancel()
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -29,7 +29,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0" MinClusterVersion = "2.3.0"
Version = "3.0.0-beta.0+git" Version = "3.0.3"
// Git SHA Value will be set during build // Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)" GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -129,15 +129,22 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err return nil, err
} }
if err := os.RemoveAll(dirpath); err != nil { // rename of directory with locked files doesn't work on windows; close
return nil, err // the WAL to release the locks so the directory can be renamed
} w.Close()
if err := os.Rename(tmpdirpath, dirpath); err != nil { if err := os.Rename(tmpdirpath, dirpath); err != nil {
return nil, err return nil, err
} }
// reopen and relock
w.fp = newFilePipeline(w.dir, segmentSizeBytes) newWAL, oerr := Open(dirpath, walpb.Snapshot{})
return w, nil if oerr != nil {
return nil, oerr
}
if _, _, _, err := newWAL.ReadAll(); err != nil {
newWAL.Close()
return nil, err
}
return newWAL, nil
} }
// Open opens the WAL at the given snap. // Open opens the WAL at the given snap.