Compare commits

...

57 Commits

Author SHA1 Message Date
20490caaf0 version: bump up to 3.1.5
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-27 10:20:28 -07:00
e156746959 raft: use rs.req.Entries[0].Data as the key for deletion in advance()
advance() should use rs.req.Entries[0].Data as the context instead of
req.Context for deletion. Since req.Context is never set, there won't be
any context being deleted from pendingReadIndex; results mem leak.

FIXES #7571
2017-03-24 15:51:39 -07:00
d84bf983cc Dockerfile-release: add nsswitch.conf into image
The file '/etc/nsswitch.conf' is created in order to
take in account '/etc/hosts' entries while resolving
domain names.
2017-03-23 15:20:49 -07:00
b44c6bff9d clientv3: use waitgroup to wait for substream goroutine teardown
When a grpc watch stream is torn down, it will join on its logical substream
goroutines by waiting for each to close a channel. This doesn't guarantee
the substream is fully exited, though, but only about to exit and can be
waiting to resume even after Watch.Close finishes. Instead, use a
waitgroup.Done at the very end of the substream defer.

Fixes #7573
2017-03-23 12:26:32 -07:00
8c3c1b4a9c *: use filepath.Join for files 2017-03-23 09:53:56 -07:00
b478387a59 wal: use path/filepath instead of path
Use the path/filepath package instead of the path package. The
path package assumes slash-separated paths, which doesn't work
on Windows. But path/filepath manipulates filename paths in a way
that's compatible across OSes.
2017-03-23 09:50:41 -07:00
dfc1f21f9d version: bump to 3.1.4+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-23 09:49:51 -07:00
41e52ebc22 version: bump to 3.1.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-22 09:46:23 -07:00
7bb538d4d4 backend: add FillPercent option 2017-03-21 12:12:32 -07:00
1622782e49 integration: ensure 'StopNotify' on publish error
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:12:13 -07:00
99b47e0c1e etcdmain: handle StopNotify when ErrStopped aborted publish
Fix https://github.com/coreos/etcd/issues/7512.

If a server starts and aborts due to config error,
it is possible to get stuck in ReadyNotify waits.
This adds select case to get notified on stop channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:10:36 -07:00
350d0cd211 ctlv3: have "protobuf" in output help string instead of "proto"
Fixes #7538
2017-03-20 12:40:25 -07:00
72f37ff79a embed: Clear default initial cluster
NewConfig() should sets initial cluster from name but we should clear it
in the event that another discovery option has been specified.

Fixes #7516
2017-03-18 07:56:18 -07:00
3221454cab etcdserver: remove possibly compacted entry look-up
Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes) slow, it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:18 -07:00
4a1bffdbc6 clientv3: close open watch channel if substream is closing on reconnect
If substream is closing but outc is still open while reconnecting, then outc
would only be closed once the watch client would connect or once the watch
client is closed. This was leading to deadlocks in the proxy tests. Instead,
close immediately if the context is canceled.

Fixes #7503
2017-03-18 07:56:18 -07:00
9d9be2bc86 ctlv3: ensure synced member list before printing env vars on member add
In cases of multiple endpoints, it's possible member add would get a its
member list from a member that has not yet recognized the membership
update. Instead, confirm that the member list response is from the
member that acked the member add or from a member that has synced
with the cluster following the member add.

Fixes #7498
2017-03-18 07:56:18 -07:00
e5462f74f1 auth: get rid of deadlocking channel passing scheme in simpleTokenTTL
Cherry-picked from 1b1fabef8f.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:05 -07:00
c68c1d9344 discovery: fix print format
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:57 -07:00
6ed56cd723 auth: nil check AuthInfo when checking admin permissions
If the context does not include auth information, get authinfo will
return a nil auth info and a nil error. This is then passed to
IsAdminPermitted, which would dereference the nil auth info.
2017-03-17 14:21:39 -07:00
a3c6f6bf81 version: bump up to 3.1.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:15 -07:00
21fdcc6443 version: bump up to 3.1.3
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-10 09:05:16 -08:00
8d122e7011 etcdmain: SdNotify when gateway, grpc-proxy are ready
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-09 11:35:20 -08:00
ade1d97893 lease: guard 'Lease.itemSet' from concurrent writes
Fix https://github.com/coreos/etcd/issues/7448.

Affected if etcd builds with Go 1.8+.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-08 14:50:06 -08:00
1300189581 gateway: fix the dns discovery method
strip the scheme from the endpoints to have a clean hostname for TCP proxy

Fixes #7452
2017-03-08 14:49:50 -08:00
1971517806 etcdctl: correctly batch revisions in make-mirror
Fixes #7410
2017-03-06 14:55:47 -08:00
d614bb0799 etcdmain: log machine default host after update check
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:31 -08:00
059dc91d4c embed: use machine default host only for default value, 0.0.0.0
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:24 -08:00
5fdbaee761 version: bump up to 3.1.2+git 2017-02-24 10:34:53 -08:00
714e7ec8db version: bump up to 3.1.2 2017-02-22 10:45:48 -08:00
2cdaf6d661 netutil: use ipv4 host by default
Was non-deterministic.
2017-02-22 10:45:38 -08:00
77a51e0dbf pkg/netutil: name GetDefaultInterfaces consistent 2017-02-22 10:45:29 -08:00
d96d3aa0ed netutil: add dualstack to linux_route
in v3.1.0 netutil couldn't get default interface for ipv6only hosts

Fixes #7219
2017-02-22 10:45:19 -08:00
66e7532f57 pkg/netutil: use native byte ordering for route information
Fixes #7199
2017-02-22 10:45:07 -08:00
3eff360e79 pkg/cpuutil: add cpuutil
A package for unsafe cpu-ish things.
2017-02-22 10:44:59 -08:00
1487071966 integration: add 'TestV3HashRestart' 2017-02-22 10:39:49 -08:00
5d62bba9c7 auth: keep old revision in 'NewAuthStore'
When there's no changes yet (right after auth
store initialization), we should commit old revision.

Fix https://github.com/coreos/etcd/issues/7359.
2017-02-22 10:39:28 -08:00
114e293119 integration: test keepalives for short TTLs 2017-02-22 10:38:38 -08:00
1439955536 clientv3: do not set next keepalive time <= now+TTL 2017-02-22 10:38:28 -08:00
2c8ecc7e13 tcpproxy: don't use range variable in reactivate goroutine
Ends up trying to reactivate only the last endpoint.
2017-02-22 10:38:19 -08:00
7b4d622a7e raft: fix read index request for #7331 2017-02-22 10:38:09 -08:00
db8abbd975 version: bump to 3.1.1+git 2017-02-21 17:00:02 -08:00
ac1c7eba21 version: bump up to 3.1.1 2017-02-16 13:53:25 -08:00
9cc6d4852a travis: update for Go 1.7.5 tests 2017-02-16 13:53:05 -08:00
ff7fa9843d clientv3: fix lease keepalive duration 2017-02-16 13:52:27 -08:00
f66138d403 clientv3: fix lease keepalive duration 2017-02-16 12:41:09 -08:00
8c87916f68 auth: add 'setupAuthStore' to tests 2017-02-14 14:39:51 -08:00
9e81b002c4 auth: correct initialization in NewAuthStore()
Because of my own silly mistake, current NewAuthStore() doesn't
initialize authStore in a correct manner. For example, after recovery
from snapshot, it cannot revive the flag of enabled/disabled. This
commit fixes the problem.

Fix https://github.com/coreos/etcd/issues/7165
2017-02-14 13:49:19 -08:00
4962c5cff7 auth: add a test case for recoverying from snapshot
Conflicts:
	auth/store_test.go
2017-02-14 13:48:17 -08:00
e5bf25a3b6 e2e: add cases for defrag and snapshot with authentication 2017-02-14 13:43:11 -08:00
98c60e8faa auth, etcdserver: let maintenance services require root role
This commit lets maintenance services require root privilege. It also
moves AuthInfoFromCtx() from etcdserver to auth pkg for cleaning purpose.
2017-02-14 13:42:31 -08:00
3ac3fa6f3d travis: disable email notifications
Was spamming security@coreos.com
2017-02-14 12:55:02 -08:00
eaa8b9e155 clientv3: test closing client cancels blocking dials 2017-02-14 11:31:51 -08:00
ea2aae464d clientv3: use DialContext
Fixes #7216
2017-02-14 11:31:43 -08:00
776739ebc2 roadmap: update roadmap 2017-01-20 14:21:08 -08:00
a7a8a47ba0 README: remove ACI, update Go version 2017-01-20 14:21:00 -08:00
379f7ae10e op-guide: change grpc-proxy from 'pre' to alpha' 2017-01-20 13:25:21 -08:00
ead2d95914 version: bump to v3.1.0+git 2017-01-20 13:25:04 -08:00
68 changed files with 1271 additions and 419 deletions

View File

@ -4,8 +4,11 @@ go_import_path: github.com/coreos/etcd
sudo: false
go:
- 1.7.4
- tip
- 1.7.5
notifications:
on_success: never
on_failure: never
env:
matrix:

View File

@ -5,6 +5,12 @@ ADD etcdctl /usr/local/bin/
RUN mkdir -p /var/etcd/
RUN mkdir -p /var/lib/etcd/
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
# but Golang relies on /etc/nsswitch.conf to check the order of DNS resolving
# (see https://github.com/golang/go/commit/9dee7771f561cf6aee081c0af6658cc81fac3918)
# To fix this we just create /etc/nsswitch.conf and add the following line:
RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
EXPOSE 2379 2380
# Define default command.

View File

@ -1,6 +1,6 @@
# gRPC proxy
*This is a pre-alpha feature, we are looking for early feedback.*
*This is an alpha feature, we are looking for early feedback.*
The gRPC proxy is a stateless etcd reverse proxy operating at the gRPC layer (L7). The proxy is designed to reduce the total processing load on the core etcd cluster. For horizontal scalability, it coalesces watch and lease API requests. To protect the cluster against abusive clients, it caches key range requests.

View File

@ -37,13 +37,14 @@ See [etcdctl][etcdctl] for a simple command line client.
### Getting etcd
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, [rkt][rkt], and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
For those wanting to try the very latest version, you can [build the latest version of etcd][dl-build] from the `master` branch.
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.7+ is required).
All development occurs on `master`, including new features and bug fixes.
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
[rkt]: https://github.com/coreos/rkt/releases/
[github-release]: https://github.com/coreos/etcd/releases/
[branch-management]: ./Documentation/branch_management.md
[dl-build]: ./Documentation/dl_build.md#build-the-latest-version

View File

@ -6,25 +6,17 @@ This document defines a high level roadmap for etcd development.
The dates below should not be considered authoritative, but rather indicative of the projected timeline of the project. The [milestones defined in GitHub](https://github.com/coreos/etcd/milestones) represent the most up-to-date and issue-for-issue plans.
etcd 3.0 is our current stable branch. The roadmap below outlines new features that will be added to etcd, and while subject to change, define what future stable will look like.
etcd 3.1 is our current stable branch. The roadmap below outlines new features that will be added to etcd, and while subject to change, define what future stable will look like.
### etcd 3.1 (2016-Oct)
- Stable L4 gateway
- Experimental support for scalable proxy
- Automatic leadership transfer for the rolling upgrade
- V3 API improvements
- Get previous key-value pair
- Get only keys (ignore values)
- Get only key count
### etcd 3.2 (2017-Apr)
### etcd 3.2 (2017-May)
- Stable scalable proxy
- Proxy-as-client interface passthrough
- Lock service
- Namespacing proxy
- JWT token based authentication
- TLS Command Name and JWT token based authentication
- Read-modify-write V3 Put
- Improved watch performance
- Support non-blocking concurrent read
### etcd 3.3 (?)
- TBD

View File

@ -21,33 +21,33 @@ import (
"crypto/rand"
"math/big"
"strings"
"sync"
"time"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
defaultSimpleTokenLength = 16
)
// var for testing purposes
var (
simpleTokenTTL = 5 * time.Minute
simpleTokenTTLResolution = 1 * time.Second
)
type simpleTokenTTLKeeper struct {
tokens map[string]time.Time
addSimpleTokenCh chan string
resetSimpleTokenCh chan string
deleteSimpleTokenCh chan string
stopCh chan chan struct{}
deleteTokenFunc func(string)
tokensMu sync.Mutex
tokens map[string]time.Time
stopCh chan chan struct{}
deleteTokenFunc func(string)
}
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
stk := &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
addSimpleTokenCh: make(chan string, 1),
resetSimpleTokenCh: make(chan string, 1),
deleteSimpleTokenCh: make(chan string, 1),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
tokens: make(map[string]time.Time),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
}
go stk.run()
return stk
@ -61,37 +61,34 @@ func (tm *simpleTokenTTLKeeper) stop() {
}
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
tm.addSimpleTokenCh <- token
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
tm.resetSimpleTokenCh <- token
if _, ok := tm.tokens[token]; ok {
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
}
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
tm.deleteSimpleTokenCh <- token
delete(tm.tokens, token)
}
func (tm *simpleTokenTTLKeeper) run() {
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
defer tokenTicker.Stop()
for {
select {
case t := <-tm.addSimpleTokenCh:
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
case t := <-tm.resetSimpleTokenCh:
if _, ok := tm.tokens[t]; ok {
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
}
case t := <-tm.deleteSimpleTokenCh:
delete(tm.tokens, t)
case <-tokenTicker.C:
nowtime := time.Now()
tm.tokensMu.Lock()
for t, tokenendtime := range tm.tokens {
if nowtime.After(tokenendtime) {
tm.deleteTokenFunc(t)
delete(tm.tokens, t)
}
}
tm.tokensMu.Unlock()
case waitCh := <-tm.stopCh:
tm.tokens = make(map[string]time.Time)
waitCh <- struct{}{}
@ -116,6 +113,7 @@ func (as *authStore) GenSimpleToken() (string, error) {
}
func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
_, ok := as.simpleTokens[token]
@ -126,16 +124,21 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokens[token] = username
as.simpleTokenKeeper.addSimpleToken(token)
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}
func (as *authStore) invalidateUser(username string) {
if as.simpleTokenKeeper == nil {
return
}
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
for token, name := range as.simpleTokens {
if strings.Compare(name, username) == 0 {
delete(as.simpleTokens, token)
as.simpleTokenKeeper.deleteSimpleToken(token)
}
}
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
@ -29,6 +30,7 @@ import (
"github.com/coreos/pkg/capnslog"
"golang.org/x/crypto/bcrypt"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
var (
@ -57,6 +59,7 @@ var (
ErrPermissionNotGranted = errors.New("auth: permission is not granted to the role")
ErrAuthNotEnabled = errors.New("auth: authentication is not enabled")
ErrAuthOldRevision = errors.New("auth: revision in header is old")
ErrInvalidAuthToken = errors.New("auth: invalid auth token")
// BcryptCost is the algorithm cost / strength for hashing auth passwords
BcryptCost = bcrypt.DefaultCost
@ -153,6 +156,9 @@ type AuthStore interface {
// Close does cleanup of AuthStore
Close() error
// AuthInfoFromCtx gets AuthInfo from gRPC's context
AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error)
}
type authStore struct {
@ -162,11 +168,24 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
simpleTokensMu sync.RWMutex
simpleTokens map[string]string // token -> username
simpleTokenKeeper *simpleTokenTTLKeeper
revision uint64
// tokenSimple in v3.2+
indexWaiter func(uint64) <-chan struct{}
simpleTokenKeeper *simpleTokenTTLKeeper
simpleTokensMu sync.Mutex
simpleTokens map[string]string // token -> username
}
func newDeleterFunc(as *authStore) func(string) {
return func(t string) {
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
if username, ok := as.simpleTokens[t]; ok {
plog.Infof("deleting token %s for user %s", t, username)
delete(as.simpleTokens, t)
}
}
}
func (as *authStore) AuthEnable() error {
@ -197,15 +216,7 @@ func (as *authStore) AuthEnable() error {
as.enabled = true
tokenDeleteFunc := func(t string) {
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
if username, ok := as.simpleTokens[t]; ok {
plog.Infof("deleting token %s for user %s", t, username)
delete(as.simpleTokens, t)
}
}
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(tokenDeleteFunc)
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.rangePermCache = make(map[string]*unifiedRangePermissions)
@ -635,13 +646,16 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
as.simpleTokensMu.RLock()
defer as.simpleTokensMu.RUnlock()
t, ok := as.simpleTokens[token]
// same as '(t *tokenSimple) info' in v3.2+
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
username, ok := as.simpleTokens[token]
if ok {
as.simpleTokenKeeper.resetSimpleToken(token)
}
return &AuthInfo{Username: t, Revision: as.revision}, ok
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
return &AuthInfo{Username: username, Revision: as.revision}, ok
}
type permSlice []*authpb.Permission
@ -753,6 +767,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
if !as.isAuthEnabled() {
return nil
}
if authInfo == nil {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()
@ -871,7 +888,7 @@ func (as *authStore) isAuthEnabled() bool {
return as.enabled
}
func NewAuthStore(be backend.Backend) *authStore {
func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{}) *authStore {
tx := be.BatchTx()
tx.Lock()
@ -879,13 +896,30 @@ func NewAuthStore(be backend.Backend) *authStore {
tx.UnsafeCreateBucket(authUsersBucketName)
tx.UnsafeCreateBucket(authRolesBucketName)
as := &authStore{
be: be,
simpleTokens: make(map[string]string),
revision: 0,
enabled := false
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
}
}
as.commitRevision(tx)
as := &authStore{
be: be,
simpleTokens: make(map[string]string),
revision: getRevision(tx),
indexWaiter: indexWaiter,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
}
if enabled {
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
}
if as.revision == 0 {
as.commitRevision(tx)
}
tx.Unlock()
be.ForceCommit()
@ -912,7 +946,8 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
func getRevision(tx backend.BatchTx) uint64 {
_, vs := tx.UnsafeRange(authBucketName, []byte(revisionKey), nil, 0)
if len(vs) != 1 {
plog.Panicf("failed to get the key of auth store revision")
// this can happen in the initialization phase
return 0
}
return binary.BigEndian.Uint64(vs[0])
@ -921,3 +956,46 @@ func getRevision(tx backend.BatchTx) uint64 {
func (as *authStore) Revision() uint64 {
return as.revision
}
func (as *authStore) isValidSimpleToken(token string, ctx context.Context) bool {
splitted := strings.Split(token, ".")
if len(splitted) != 2 {
return false
}
index, err := strconv.Atoi(splitted[1])
if err != nil {
return false
}
select {
case <-as.indexWaiter(uint64(index)):
return true
case <-ctx.Done():
}
return false
}
func (as *authStore) AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
if !ok {
return nil, nil
}
ts, tok := md["token"]
if !tok {
return nil, nil
}
token := ts[0]
if !as.isValidSimpleToken(token, ctx) {
return nil, ErrInvalidAuthToken
}
authInfo, uok := as.AuthInfoFromToken(token)
if !uok {
plog.Warningf("invalid auth token: %s", token)
return nil, ErrInvalidAuthToken
}
return authInfo, nil
}

View File

@ -26,31 +26,38 @@ import (
func init() { BcryptCost = bcrypt.MinCost }
func TestUserAdd(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer func() {
b.Close()
os.Remove(tPath)
func dummyIndexWaiter(index uint64) <-chan struct{} {
ch := make(chan struct{})
go func() {
ch <- struct{}{}
}()
return ch
}
as := NewAuthStore(b)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add a non-existing user
// TestNewAuthStoreRevision ensures newly auth store
// keeps the old revision when there are no changes.
func TestNewAuthStoreRevision(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer os.Remove(tPath)
as := NewAuthStore(b, dummyIndexWaiter)
err := enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
_, err = as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
if err != ErrUserAlreadyExist {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
old := as.Revision()
b.Close()
as.Close()
ua = &pb.AuthUserAddRequest{Name: ""}
_, err = as.UserAdd(ua) // add a user with empty name
if err != ErrUserEmpty {
t.Fatal(err)
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(b2, dummyIndexWaiter)
new := as.Revision()
b2.Close()
as.Close()
if old != new {
t.Fatalf("expected revision %d, got %d", old, new)
}
}
@ -80,7 +87,7 @@ func TestCheckPassword(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -125,7 +132,7 @@ func TestUserDelete(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -162,7 +169,7 @@ func TestUserChangePassword(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -208,7 +215,7 @@ func TestRoleAdd(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -229,7 +236,7 @@ func TestUserGrant(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -261,4 +268,93 @@ func TestUserGrant(t *testing.T) {
if err != ErrUserNotFound {
t.Fatalf("expected %v, got %v", ErrUserNotFound, err)
}
// non-admin user
err = as.IsAdminPermitted(&AuthInfo{Username: "foo", Revision: 1})
if err != ErrPermissionDenied {
t.Errorf("expected %v, got %v", ErrPermissionDenied, err)
}
// disabled auth should return nil
as.AuthDisable()
err = as.IsAdminPermitted(&AuthInfo{Username: "root", Revision: 1})
if err != nil {
t.Errorf("expected nil, got %v", err)
}
}
func TestRecoverFromSnapshot(t *testing.T) {
as, _ := setupAuthStore(t)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
if err != ErrUserAlreadyExist {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
ua = &pb.AuthUserAddRequest{Name: ""}
_, err = as.UserAdd(ua) // add a user with empty name
if err != ErrUserEmpty {
t.Fatal(err)
}
as.Close()
as2 := NewAuthStore(as.be, dummyIndexWaiter)
defer func(a *authStore) {
a.Close()
}(as2)
if !as2.isAuthEnabled() {
t.Fatal("recovering authStore from existing backend failed")
}
ul, err := as.UserList(&pb.AuthUserListRequest{})
if err != nil {
t.Fatal(err)
}
if !contains(ul.Users, "root") {
t.Errorf("expected %v in %v", "root", ul.Users)
}
}
func contains(array []string, str string) bool {
for _, s := range array {
if s == str {
return true
}
}
return false
}
func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testing.T)) {
b, tPath := backend.NewDefaultTmpBackend()
as := NewAuthStore(b, dummyIndexWaiter)
err := enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
// adds a new role
_, err = as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test"})
if err != nil {
t.Fatal(err)
}
ua := &pb.AuthUserAddRequest{Name: "foo", Password: "bar"}
_, err = as.UserAdd(ua) // add a non-existing user
if err != nil {
t.Fatal(err)
}
tearDown := func(t *testing.T) {
b.Close()
os.Remove(tPath)
as.Close()
}
return as, tearDown
}

View File

@ -221,7 +221,8 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
return nil, c.ctx.Err()
default:
}
return net.DialTimeout(proto, host, t)
dialer := &net.Dialer{Timeout: t}
return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))

View File

@ -16,6 +16,7 @@ package clientv3
import (
"fmt"
"net"
"testing"
"time"
@ -25,6 +26,47 @@ import (
"google.golang.org/grpc"
)
func TestDialCancel(t *testing.T) {
defer testutil.AfterTest(t)
// accept first connection so client is created with dial timeout
ln, err := net.Listen("unix", "dialcancel:12345")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
ep := "unix://dialcancel:12345"
cfg := Config{
Endpoints: []string{ep},
DialTimeout: 30 * time.Second}
c, err := New(cfg)
if err != nil {
t.Fatal(err)
}
// connect to ipv4 blackhole so dial blocks
c.SetEndpoints("http://254.0.0.1:12345")
// issue Get to force redial attempts
go c.Get(context.TODO(), "abc")
// wait a little bit so client close is after dial starts
time.Sleep(100 * time.Millisecond)
donec := make(chan struct{})
go func() {
defer close(donec)
c.Close()
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("failed to close")
case <-donec:
}
}
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -156,6 +156,30 @@ func TestLeaseKeepAlive(t *testing.T) {
}
}
func TestLeaseKeepAliveOneSecond(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
resp, err := cli.Grant(context.Background(), 1)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed")
}
}
}
// TODO: add a client that can connect to all the members of cluster via unix sock.
// TODO: test handle more complicated failures.
func TestLeaseKeepAliveHandleFailure(t *testing.T) {

View File

@ -416,7 +416,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
}
// send update to all channels
nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs {
select {

View File

@ -132,6 +132,8 @@ type watchGrpcStream struct {
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
for range closing {
w.closeSubstream(<-w.closingc)
}
w.wg.Wait()
w.owner.closeStream(w)
}()
@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if !resuming {
w.closingc <- ws
}
w.wg.Done()
}()
emptyWr := &WatchResponse{}
@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
continue
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
}
@ -694,6 +699,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return
}
select {

View File

@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
if ok && url.Scheme != scheme {
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}

View File

@ -16,15 +16,20 @@ package e2e
import "testing"
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) }
func defragTest(cx ctlCtx) {
func maintenanceInitKeys(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
}
}
func defragTest(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := ctlV3Compact(cx, 4, cx.compactPhysical); err != nil {
cx.t.Fatal(err)
@ -35,6 +40,29 @@ func defragTest(cx ctlCtx) {
}
}
func defragTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
// ordinal user cannot defrag
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Defrag(cx); err == nil {
cx.t.Fatal("ordinal user should not be able to issue a defrag request")
}
// root can defrag
cx.user, cx.pass = "root", "root"
if err := ctlV3Defrag(cx); err != nil {
cx.t.Fatal(err)
}
}
func ctlV3Defrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "defrag")
lines := make([]string, cx.epc.cfg.clusterSize)

View File

@ -32,12 +32,7 @@ import (
func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
func snapshotTest(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
}
maintenanceInitKeys(cx)
leaseID, err := ctlV3LeaseGrant(cx, 100)
if err != nil {
@ -250,3 +245,42 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
}
func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) }
func snapshotTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
// ordinal user cannot save a snapshot
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3SnapshotSave(cx, fpath); err == nil {
cx.t.Fatal("ordinal user should not be able to save a snapshot")
}
// root can save a snapshot
cx.user, cx.pass = "root", "root"
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
st, err := getSnapshotStatus(cx, fpath)
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
}
}

View File

@ -55,20 +55,12 @@ var (
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
DefaultAdvertiseClientURLs = "http://localhost:2379"
defaultHostname string = "localhost"
defaultHostname string
defaultHostStatus error
)
func init() {
ip, err := netutil.GetDefaultHost()
if err != nil {
defaultHostStatus = err
return
}
// found default host, advertise on it
DefaultInitialAdvertisePeerURLs = "http://" + ip + ":2380"
DefaultAdvertiseClientURLs = "http://" + ip + ":2379"
defaultHostname = ip
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
}
// Config holds the arguments for configuring an etcd server.
@ -237,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.ACUrls = []url.URL(u)
}
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}
@ -346,34 +341,52 @@ func (cfg Config) InitialClusterFromName(name string) (ret string) {
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// IsDefaultHost returns the default hostname, if used, and the error, if any,
// from getting the machine's default host.
func (cfg Config) IsDefaultHost() (string, error) {
if len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs {
return defaultHostname, defaultHostStatus
}
if len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs {
return defaultHostname, defaultHostStatus
}
return "", defaultHostStatus
func (cfg Config) defaultPeerHost() bool {
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with default host.
func (cfg Config) defaultClientHost() bool {
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
// then the advertise peer host would be updated with machine's default host,
// while keeping the listen URL's port.
// User can work around this by explicitly setting URL with 127.0.0.1.
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
// TODO: check whether fields are set instead of whether fields have default value
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) {
defaultHost, defaultHostErr := cfg.IsDefaultHost()
defaultHostOverride := defaultHost == "" || defaultHostErr == nil
if (defaultHostOverride || cfg.Name != DefaultName) && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
ip, _, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
// if client-listen-url is 0.0.0.0, just use detected default host
// otherwise, rewrite advertise-client-url with localhost
if ip != "0.0.0.0" {
_, acPort, _ := net.SplitHostPort(cfg.ACUrls[0].Host)
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("localhost:%s", acPort)}
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
if defaultHostname == "" || defaultHostStatus != nil {
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
return "", defaultHostStatus
}
used := false
pip, pport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
used = true
}
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
cip, cport, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
if cfg.defaultClientHost() && cip == "0.0.0.0" {
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
used = true
}
dhost := defaultHostname
if !used {
dhost = ""
}
return dhost, defaultHostStatus
}
// checkBindURLs returns an error if any URL uses a domain name.

View File

@ -15,11 +15,15 @@
package embed
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"testing"
"github.com/coreos/etcd/pkg/transport"
"github.com/ghodss/yaml"
)
@ -61,6 +65,70 @@ func TestConfigFileOtherFields(t *testing.T) {
}
}
// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
func TestUpdateDefaultClusterFromName(t *testing.T) {
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origpeer := cfg.APUrls[0].String()
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
// in case of 'etcd --name=abc'
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if exp != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
}
// advertise peer URL should not be affected
if origpeer != cfg.APUrls[0].String() {
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
// TestUpdateDefaultClusterFromNameOverwrite ensures that machine's default host is only used
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
if defaultHostname == "" {
t.Skip("machine's default host not found")
}
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if dhost != defaultHostname {
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
}
aphost, apport, _ := net.SplitHostPort(cfg.APUrls[0].Host)
if apport != lpport {
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
}
if aphost != defaultHostname {
t.Fatalf("advertise peer url expected machine default host %q, got %q", defaultHostname, aphost)
}
expected := fmt.Sprintf("%s=%s://%s:%s", cfg.Name, oldscheme, defaultHostname, lpport)
if expected != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", expected, cfg.InitialCluster)
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
return s.CAFile == t.CAFile &&
s.CertFile == t.CertFile &&

View File

@ -19,7 +19,7 @@ import (
"fmt"
"net"
"net/http"
"path"
"path/filepath"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
@ -166,7 +166,7 @@ func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
for i, u := range cfg.LPUrls {
phosts[i] = u.Host
}
cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}
@ -221,7 +221,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
for i, u := range cfg.LCUrls {
chosts[i] = u.Host
}
cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
if err != nil {
plog.Fatalf("could not get certs (%v)", err)
}

View File

@ -15,7 +15,7 @@
package embed
import (
"path"
"path/filepath"
"github.com/coreos/etcd/wal"
)
@ -23,7 +23,7 @@ import (
func isMemberInitialized(cfg *Config) bool {
waldir := cfg.WalDir
if waldir == "" {
waldir = path.Join(cfg.Dir, "member", "wal")
waldir = filepath.Join(cfg.Dir, "member", "wal")
}
return wal.Exist(waldir)

View File

@ -17,7 +17,7 @@ package command
import (
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -50,19 +50,19 @@ func handleBackup(c *cli.Context) error {
var srcWAL string
var destWAL string
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
if c.String("wal-dir") != "" {
srcWAL = c.String("wal-dir")
} else {
srcWAL = path.Join(c.String("data-dir"), "member", "wal")
srcWAL = filepath.Join(c.String("data-dir"), "member", "wal")
}
if c.String("backup-wal-dir") != "" {
destWAL = c.String("backup-wal-dir")
} else {
destWAL = path.Join(c.String("backup-dir"), "member", "wal")
destWAL = filepath.Join(c.String("backup-dir"), "member", "wal")
}
if err := fileutil.CreateDirAll(destSnap); err != nil {

View File

@ -125,18 +125,19 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
return rpctypes.ErrCompacted
}
var rev int64
var lastRev int64
ops := []clientv3.Op{}
for _, ev := range wr.Events {
nrev := ev.Kv.ModRevision
if rev != 0 && nrev > rev {
nextRev := ev.Kv.ModRevision
if lastRev != 0 && nextRev > lastRev {
_, err := dc.Txn(ctx).Then(ops...).Commit()
if err != nil {
return err
}
ops = []clientv3.Op{}
}
lastRev = nextRev
switch ev.Type {
case mvccpb.PUT:
ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))

View File

@ -107,7 +107,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
cli := mustClientFromCmd(cmd)
resp, err := cli.MemberAdd(ctx, urls)
cancel()
if err != nil {
ExitWithError(ExitError, err)
@ -118,12 +119,24 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := mustClientFromCmd(cmd).MemberList(ctx)
cancel()
if err != nil {
ExitWithError(ExitError, err)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
break
}
// quorum get to sync cluster list
gresp, gerr := cli.Get(ctx, "_")
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
listResp, err = cli.MemberList(ctx)
}
cancel()
conf := []string{}
for _, memb := range listResp.Members {

View File

@ -21,7 +21,7 @@ import (
"io"
"os"
"os/exec"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/client"
@ -103,7 +103,7 @@ func prepareBackend() backend.Backend {
var be backend.Backend
bch := make(chan struct{})
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
go func() {
defer close(bch)
be = backend.New(dbpath, time.Second, 10000)
@ -130,9 +130,9 @@ func rebuildStoreV2() (store.Store, uint64) {
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
waldir = filepath.Join(migrateDatadir, "member", "wal")
}
snapdir := path.Join(migrateDatadir, "member", "snap")
snapdir := filepath.Join(migrateDatadir, "member", "snap")
ss := snap.New(snapdir)
snapshot, err := ss.Load()

View File

@ -23,7 +23,7 @@ import (
"io"
"math"
"os"
"path"
"path/filepath"
"reflect"
"strings"
@ -186,8 +186,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
basedir = restoreName + ".etcd"
}
waldir := path.Join(basedir, "member", "wal")
snapdir := path.Join(basedir, "member", "snap")
waldir := filepath.Join(basedir, "member", "wal")
snapdir := filepath.Join(basedir, "member", "snap")
if _, err := os.Stat(basedir); err == nil {
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
@ -325,7 +325,7 @@ func makeDB(snapdir, dbfile string, commit int) {
ExitWithError(ExitIO, err)
}
dbpath := path.Join(snapdir, "db")
dbpath := filepath.Join(snapdir, "db")
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
if dberr != nil {
ExitWithError(ExitIO, dberr)

View File

@ -45,7 +45,7 @@ var (
func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")

View File

@ -22,7 +22,7 @@ import (
"net"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
@ -39,8 +39,6 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy/httpproxy"
"github.com/coreos/etcd/version"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
@ -85,7 +83,13 @@ func startEtcdOrProxyV2() {
GoMaxProcs := runtime.GOMAXPROCS(0)
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
(&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
defaultHost, dhErr := (&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" {
plog.Infof("advertising using detected default host %q", defaultHost)
}
if dhErr != nil {
plog.Noticef("failed to detect default host (%v)", dhErr)
}
if cfg.Dir == "" {
cfg.Dir = fmt.Sprintf("%v.etcd", cfg.Name)
@ -157,20 +161,12 @@ func startEtcdOrProxyV2() {
osutil.HandleInterrupts()
if systemdutil.IsRunningSystemd() {
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
notifySystemd()
select {
case lerr := <-errc:
@ -184,15 +180,6 @@ func startEtcdOrProxyV2() {
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
defaultHost, dhErr := cfg.IsDefaultHost()
if defaultHost != "" {
if dhErr == nil {
plog.Infof("advertising using detected default host %q", defaultHost)
} else {
plog.Noticef("failed to detect default host, advertise falling back to %q (%v)", defaultHost, dhErr)
}
}
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
@ -202,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}
@ -221,14 +211,14 @@ func startProxy(cfg *config) error {
return err
}
cfg.Dir = path.Join(cfg.Dir, "proxy")
cfg.Dir = filepath.Join(cfg.Dir, "proxy")
err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
if err != nil {
return err
}
var peerURLs []string
clusterfile := path.Join(cfg.Dir, "cluster")
clusterfile := filepath.Join(cfg.Dir, "cluster")
b, err := ioutil.ReadFile(clusterfile)
switch {

View File

@ -17,12 +17,14 @@ package etcdmain
import (
"fmt"
"net"
"net/url"
"os"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/tcpproxy"
"github.com/spf13/cobra"
)
@ -77,6 +79,20 @@ func newGatewayStartCommand() *cobra.Command {
return &cmd
}
func stripSchema(eps []string) []string {
var endpoints []string
for _, ep := range eps {
if u, err := url.Parse(ep); err == nil && u.Host != "" {
ep = u.Host
}
endpoints = append(endpoints, ep)
}
return endpoints
}
func startGateway(cmd *cobra.Command, args []string) {
endpoints := gatewayEndpoints
if gatewayDNSCluster != "" {
@ -101,6 +117,9 @@ func startGateway(cmd *cobra.Command, args []string) {
}
}
// Strip the schema from the endpoints because we start just a TCP proxy
endpoints = stripSchema(endpoints)
if len(endpoints) == 0 {
plog.Fatalf("no endpoints found")
}
@ -117,5 +136,8 @@ func startGateway(cmd *cobra.Command, args []string) {
MonitorInterval: getewayRetryDelay,
}
// At this point, etcd gateway listener is initialized
notifySystemd()
tp.Run()
}

View File

@ -144,6 +144,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
go func() { errc <- m.Serve() }()
// grpc-proxy is initialized, ready to serve
notifySystemd()
fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
}

View File

@ -17,6 +17,9 @@ package etcdmain
import (
"fmt"
"os"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
)
func Main() {
@ -35,3 +38,16 @@ func Main() {
startEtcdOrProxyV2()
}
func notifySystemd() {
if !systemdutil.IsRunningSystemd() {
return
}
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}

View File

@ -18,6 +18,7 @@ import (
"crypto/sha256"
"io"
"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
@ -45,6 +46,10 @@ type RaftStatusGetter interface {
Leader() types.ID
}
type AuthGetter interface {
AuthStore() auth.AuthStore
}
type maintenanceServer struct {
rg RaftStatusGetter
kg KVGetter
@ -54,7 +59,8 @@ type maintenanceServer struct {
}
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
return &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
return &authMaintenanceServer{srv, s}
}
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
@ -139,3 +145,49 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
ms.hdr.fill(resp.Header)
return resp, nil
}
type authMaintenanceServer struct {
*maintenanceServer
ag AuthGetter
}
func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
authInfo, err := ams.ag.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
return ams.ag.AuthStore().IsAdminPermitted(authInfo)
}
func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Defragment(ctx, sr)
}
func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
if err := ams.isAuthenticated(srv.Context()); err != nil {
return err
}
return ams.maintenanceServer.Snapshot(sr, srv)
}
func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Hash(ctx, r)
}
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Status(ctx, ar)
}

View File

@ -93,7 +93,7 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCPermissionNotGranted
case auth.ErrAuthNotEnabled:
return rpctypes.ErrGRPCAuthNotEnabled
case etcdserver.ErrInvalidAuthToken:
case auth.ErrInvalidAuthToken:
return rpctypes.ErrGRPCInvalidAuthToken
default:
return grpc.Errorf(codes.Unknown, err.Error())

View File

@ -16,7 +16,7 @@ package etcdserver
import (
"fmt"
"path"
"path/filepath"
"sort"
"strings"
"time"
@ -118,16 +118,16 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
return nil
}
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
func (c *ServerConfig) WALDir() string {
if c.DedicatedWALDir != "" {
return c.DedicatedWALDir
}
return path.Join(c.MemberDir(), "wal")
return filepath.Join(c.MemberDir(), "wal")
}
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }

View File

@ -31,7 +31,6 @@ var (
ErrNoLeader = errors.New("etcdserver: no leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
)

View File

@ -23,6 +23,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
@ -263,7 +264,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
ss := snap.New(cfg.SnapDir())
bepath := path.Join(cfg.SnapDir(), databaseFilename)
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)
var be backend.Backend
@ -459,7 +460,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
srv.authStore = auth.NewAuthStore(srv.be)
srv.authStore = auth.NewAuthStore(srv.be,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
})
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
srv.compactor.Run()
@ -591,6 +595,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}
@ -663,6 +668,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedt: snap.Metadata.Term,
appliedi: snap.Metadata.Index,
}
@ -762,7 +768,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
@ -786,7 +792,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
@ -864,6 +870,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
@ -885,7 +892,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@ -1019,7 +1026,7 @@ func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) err
// in the state machine layer
// However, both of membership change and role management requires the root privilege.
// So careful operation by admins can prevent the problem.
authInfo, err := s.authInfoFromCtx(ctx)
authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
@ -1239,9 +1246,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
@ -1251,16 +1256,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

View File

@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()

View File

@ -17,8 +17,6 @@ package etcdserver
import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/auth"
@ -31,7 +29,6 @@ import (
"github.com/coreos/go-semver/semver"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
@ -617,52 +614,10 @@ func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest
return result.resp.(*pb.AuthRoleDeleteResponse), nil
}
func (s *EtcdServer) isValidSimpleToken(token string) bool {
splitted := strings.Split(token, ".")
if len(splitted) != 2 {
return false
}
index, err := strconv.Atoi(splitted[1])
if err != nil {
return false
}
select {
case <-s.applyWait.Wait(uint64(index)):
return true
case <-s.stop:
return true
}
}
func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
if !ok {
return nil, nil
}
ts, tok := md["token"]
if !tok {
return nil, nil
}
token := ts[0]
if !s.isValidSimpleToken(token) {
return nil, ErrInvalidAuthToken
}
authInfo, uok := s.AuthStore().AuthInfoFromToken(token)
if !uok {
plog.Warningf("invalid auth token: %s", token)
return nil, ErrInvalidAuthToken
}
return authInfo, nil
}
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
for {
ai, err := s.authInfoFromCtx(ctx)
ai, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
@ -697,7 +652,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
ID: s.reqIDGen.Next(),
}
authInfo, err := s.authInfoFromCtx(ctx)
authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return nil, err
}

View File

@ -449,6 +449,8 @@ type member struct {
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
keepDataDirTerminate bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@ -746,8 +748,10 @@ func (m *member) Restart(t *testing.T) error {
func (m *member) Terminate(t *testing.T) {
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
if !m.keepDataDirTerminate {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
}
}
// TestRestartRemoved ensures that restarting removed member must exit
// if 'initial-cluster-state' is set 'new' and old data directory still exists
// (see https://github.com/coreos/etcd/issues/7512 for more).
func TestRestartRemoved(t *testing.T) {
defer testutil.AfterTest(t)
capnslog.SetGlobalLogLevel(capnslog.INFO)
// 1. start single-member cluster
c := NewCluster(t, 1)
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
c.Launch(t)
defer c.Terminate(t)
// 2. add a new member
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.keepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
}()
select {
case <-oldm.s.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.

View File

@ -18,7 +18,7 @@ import (
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
@ -58,7 +58,7 @@ func TestEmbedEtcd(t *testing.T) {
setupEmbedCfg(&tests[5].cfg, []url.URL{urls[4]}, []url.URL{urls[5], urls[6]})
setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]})
dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(dir)
defer os.RemoveAll(dir)

View File

@ -581,6 +581,37 @@ func TestV3Hash(t *testing.T) {
}
}
// TestV3HashRestart ensures that hash stays the same after restart.
func TestV3HashRestart(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash1 := resp.Hash
clus.Members[0].Stop(t)
clus.Members[0].Restart(t)
clus.waitLeader(t, clus.Members)
kvc := toGRPC(clus.Client(0)).KV
waitForRestart(t, kvc)
cli = clus.RandClient()
resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash2 := resp.Hash
if hash1 != hash2 {
t.Fatalf("hash expected %d, got %d", hash1, hash2)
}
}
// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
func TestV3StorageQuotaAPI(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// sort keys so deletes are in same order among all members,
// otherwise the backened hashes will be different
keys := make([]string, 0, len(l.itemSet))
for item := range l.itemSet {
keys = append(keys, item.Key)
}
keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}
@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
delete(l.itemSet, it)
delete(le.itemMap, it)
}
l.mu.Unlock()
return nil
}
@ -506,6 +507,8 @@ type Lease struct {
// expiry is time when lease should expire; must be 64-bit aligned.
expiry monotime.Time
// mu protects concurrent accesses to itemSet
mu sync.RWMutex
itemSet map[LeaseItem]struct{}
revokec chan struct{}
}
@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
keys := make([]string, 0, len(l.itemSet))
for k := range l.itemSet {
keys = append(keys, k.Key)
}
l.mu.RUnlock()
return keys
}

View File

@ -15,11 +15,13 @@
package lease
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"sort"
"sync"
"testing"
"time"
@ -76,6 +78,53 @@ func TestLessorGrant(t *testing.T) {
be.BatchTx().Unlock()
}
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
// from concurrent map writes on 'itemSet'.
func TestLeaseConcurrentKeys(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
fd := &fakeDeleter{}
le := newLessor(be, minLeaseTTL)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}
itemn := 10
items := make([]LeaseItem, itemn)
for i := 0; i < itemn; i++ {
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
}
if err = le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
donec := make(chan struct{})
go func() {
le.Detach(l.ID, items)
close(donec)
}()
var wg sync.WaitGroup
wg.Add(itemn)
for i := 0; i < itemn; i++ {
go func() {
defer wg.Done()
l.Keys()
}()
}
<-donec
wg.Wait()
}
// TestLessorRevoke ensures Lessor can revoke a lease.
// The items in the revoked lease should be removed from
// the backend.
@ -351,5 +400,5 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
t.Fatalf("failed to create tmpdir (%v)", err)
}
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
return tmpPath, backend.New(filepath.Join(tmpPath, "be"), time.Second, 10000)
}

View File

@ -20,7 +20,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)
@ -334,7 +337,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
if err != nil {
plog.Fatal(err)
}
tmpPath := path.Join(dir, "database")
tmpPath := filepath.Join(dir, "database")
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
}

16
pkg/cpuutil/doc.go Normal file
View File

@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cpuutil provides facilities for detecting cpu-specific features.
package cpuutil

36
pkg/cpuutil/endian.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cpuutil
import (
"encoding/binary"
"unsafe"
)
const intWidth int = int(unsafe.Sizeof(0))
var byteOrder binary.ByteOrder
// ByteOrder returns the byte order for the CPU's native endianness.
func ByteOrder() binary.ByteOrder { return byteOrder }
func init() {
var i int = 0x1
if v := (*[intWidth]byte)(unsafe.Pointer(&i)); v[0] == 0 {
byteOrder = binary.BigEndian
} else {
byteOrder = binary.LittleEndian
}
}

View File

@ -19,7 +19,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"github.com/coreos/pkg/capnslog"
@ -39,7 +39,7 @@ var (
// IsDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func IsDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
f := filepath.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), PrivateFileMode); err != nil {
return err
}

View File

@ -16,7 +16,7 @@ package fileutil
import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@ -45,7 +45,7 @@ func purgeFile(dirname string, suffix string, max uint, interval time.Duration,
sort.Strings(newfnames)
fnames = newfnames
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
f := filepath.Join(dirname, newfnames[0])
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
break

View File

@ -18,7 +18,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
"time"
@ -33,7 +33,7 @@ func TestPurgeFile(t *testing.T) {
// minimal file set
for i := 0; i < 3; i++ {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if ferr != nil {
t.Fatal(err)
}
@ -53,7 +53,7 @@ func TestPurgeFile(t *testing.T) {
// rest of the files
for i := 4; i < 10; i++ {
go func(n int) {
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", n)))
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", n)))
if ferr != nil {
t.Fatal(err)
}
@ -99,7 +99,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
for i := 0; i < 10; i++ {
var f *os.File
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
@ -107,7 +107,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
}
// create a purge barrier at 5
p := path.Join(dir, fmt.Sprintf("%d.test", 5))
p := filepath.Join(dir, fmt.Sprintf("%d.test", 5))
l, err := LockFile(p, os.O_WRONLY, PrivateFileMode)
if err != nil {
t.Fatal(err)

View File

@ -43,7 +43,7 @@ func RecoverPort(port int) error {
// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
@ -51,14 +51,16 @@ func SetLatency(ms, rv int) error {
if rv > ms {
rv = 1
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
for ifce := range ifces {
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}
}
return nil
@ -66,10 +68,15 @@ func SetLatency(ms, rv int) error {
// RemoveLatency resets latency configurations.
func RemoveLatency() error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
return err
for ifce := range ifces {
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
if err != nil {
return err
}
}
return nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !linux !386,!amd64
// +build !linux
package netutil
@ -27,7 +27,7 @@ func GetDefaultHost() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}
// GetDefaultInterface fetches the device name of default routable interface.
func GetDefaultInterface() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
// GetDefaultInterfaces fetches the device name of default routable interface.
func GetDefaultInterfaces() (map[string]uint8, error) {
return nil, fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}

View File

@ -13,9 +13,6 @@
// limitations under the License.
// +build linux
// +build 386 amd64
// TODO support native endian but without using "unsafe"
package netutil
@ -24,27 +21,57 @@ import (
"encoding/binary"
"fmt"
"net"
"sort"
"syscall"
"github.com/coreos/etcd/pkg/cpuutil"
)
var errNoDefaultRoute = fmt.Errorf("could not find default route")
var errNoDefaultHost = fmt.Errorf("could not find default host")
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
// GetDefaultHost obtains the first IP address of machine from the routing table and returns the IP address as string.
// An IPv4 address is preferred to an IPv6 address for backward compatibility.
func GetDefaultHost() (string, error) {
rmsg, rerr := getDefaultRoute()
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return "", rerr
}
host, oif, err := parsePREFSRC(rmsg)
if err != nil {
return "", err
// prioritize IPv4
if rmsg, ok := rmsgs[syscall.AF_INET]; ok {
if host, err := chooseHost(syscall.AF_INET, rmsg); host != "" || err != nil {
return host, err
}
delete(rmsgs, syscall.AF_INET)
}
if host != "" {
return host, nil
// sort so choice is deterministic
var families []int
for family := range rmsgs {
families = append(families, int(family))
}
sort.Ints(families)
for _, f := range families {
family := uint8(f)
if host, err := chooseHost(family, rmsgs[family]); host != "" || err != nil {
return host, err
}
}
return "", errNoDefaultHost
}
func chooseHost(family uint8, rmsg *syscall.NetlinkMessage) (string, error) {
host, oif, err := parsePREFSRC(rmsg)
if host != "" || err != nil {
return host, err
}
// prefsrc not detected, fall back to getting address from iface
ifmsg, ierr := getIface(oif)
ifmsg, ierr := getIfaceAddr(oif, family)
if ierr != nil {
return "", ierr
}
@ -55,15 +82,16 @@ func GetDefaultHost() (string, error) {
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.RTA_SRC {
// search for RTA_DST because ipv6 doesn't have RTA_SRC
if attr.Attr.Type == syscall.RTA_DST {
return net.IP(attr.Value).String(), nil
}
}
return "", errNoDefaultRoute
return "", nil
}
func getDefaultRoute() (*syscall.NetlinkMessage, error) {
func getDefaultRoutes() (map[uint8]*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETROUTE, syscall.AF_UNSPEC)
if err != nil {
return nil, err
@ -74,26 +102,33 @@ func getDefaultRoute() (*syscall.NetlinkMessage, error) {
return nil, msgErr
}
routes := make(map[uint8]*syscall.NetlinkMessage)
rtmsg := syscall.RtMsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWROUTE {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofRtMsg])
if rerr := binary.Read(buf, binary.LittleEndian, &rtmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &rtmsg); rerr != nil {
continue
}
if rtmsg.Dst_len == 0 {
if rtmsg.Dst_len == 0 && rtmsg.Table == syscall.RT_TABLE_MAIN {
// zero-length Dst_len implies default route
return &m, nil
msg := m
routes[rtmsg.Family] = &msg
}
}
if len(routes) > 0 {
return routes, nil
}
return nil, errNoDefaultRoute
}
func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, syscall.AF_UNSPEC)
// Used to get an address of interface.
func getIfaceAddr(idx uint32, family uint8) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, int(family))
if err != nil {
return nil, err
}
@ -109,7 +144,7 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfAddrmsg])
if rerr := binary.Read(buf, binary.LittleEndian, &ifaddrmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifaddrmsg); rerr != nil {
continue
}
if ifaddrmsg.Index == idx {
@ -117,38 +152,75 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
}
}
return nil, errNoDefaultRoute
return nil, fmt.Errorf("could not find address for interface index %v", idx)
}
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
func GetDefaultInterface() (string, error) {
rmsg, rerr := getDefaultRoute()
if rerr != nil {
return "", rerr
}
_, oif, err := parsePREFSRC(rmsg)
// Used to get a name of interface.
func getIfaceLink(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETLINK, syscall.AF_UNSPEC)
if err != nil {
return "", err
return nil, err
}
ifmsg, ierr := getIface(oif)
if ierr != nil {
return "", ierr
msgs, msgErr := syscall.ParseNetlinkMessage(dat)
if msgErr != nil {
return nil, msgErr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return "", aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
return string(attr.Value[:len(attr.Value)-1]), nil
ifinfomsg := syscall.IfInfomsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWLINK {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfInfomsg])
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifinfomsg); rerr != nil {
continue
}
if ifinfomsg.Index == int32(idx) {
return &m, nil
}
}
return "", errNoDefaultInterface
return nil, fmt.Errorf("could not find link for interface index %v", idx)
}
// GetDefaultInterfaces gets names of interfaces and returns a map[interface]families.
func GetDefaultInterfaces() (map[string]uint8, error) {
interfaces := make(map[string]uint8)
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return interfaces, rerr
}
for family, rmsg := range rmsgs {
_, oif, err := parsePREFSRC(rmsg)
if err != nil {
return interfaces, err
}
ifmsg, ierr := getIfaceLink(oif)
if ierr != nil {
return interfaces, ierr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return interfaces, aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
// key is an interface name
// possible values: 2 - AF_INET, 10 - AF_INET6, 12 - dualstack
interfaces[string(attr.Value[:len(attr.Value)-1])] += family
}
}
}
if len(interfaces) > 0 {
return interfaces, nil
}
return interfaces, errNoDefaultInterface
}
// parsePREFSRC returns preferred source address and output interface index (RTA_OIF).
@ -164,7 +236,7 @@ func parsePREFSRC(m *syscall.NetlinkMessage) (host string, oif uint32, err error
host = net.IP(attr.Value).String()
}
if attr.Attr.Type == syscall.RTA_OIF {
oif = binary.LittleEndian.Uint32(attr.Value)
oif = cpuutil.ByteOrder().Uint32(attr.Value)
}
if host != "" && oif != uint32(0) {
break

View File

@ -19,9 +19,17 @@ package netutil
import "testing"
func TestGetDefaultInterface(t *testing.T) {
ifc, err := GetDefaultInterface()
ifc, err := GetDefaultInterfaces()
if err != nil {
t.Fatal(err)
}
t.Logf("default network interface: %q\n", ifc)
t.Logf("default network interfaces: %+v\n", ifc)
}
func TestGetDefaultHost(t *testing.T) {
ip, err := GetDefaultHost()
if err != nil {
t.Fatal(err)
}
t.Logf("default ip: %v", ip)
}

View File

@ -27,7 +27,7 @@ import (
"math/big"
"net"
"os"
"path"
"path/filepath"
"strings"
"time"
@ -91,8 +91,8 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
return
}
certPath := path.Join(dirpath, "cert.pem")
keyPath := path.Join(dirpath, "key.pem")
certPath := filepath.Join(dirpath, "cert.pem")
keyPath := filepath.Join(dirpath, "key.pem")
_, errcert := os.Stat(certPath)
_, errkey := os.Stat(keyPath)
if errcert == nil && errkey == nil {

View File

@ -147,16 +147,17 @@ func (tp *TCPProxy) runMonitor() {
select {
case <-time.After(tp.MonitorInterval):
tp.mu.Lock()
for _, r := range tp.remotes {
if !r.isActive() {
go func() {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}()
for _, rem := range tp.remotes {
if rem.isActive() {
continue
}
go func(r *remote) {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}(rem)
}
tp.mu.Unlock()
case <-tp.donec:

View File

@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) {
return
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.

View File

@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
}
}
// TestRaftFreesReadOnlyMem ensures raft will free read request from
// readOnly readIndexQueue and pendingReadIndex map.
// related issue: https://github.com/coreos/etcd/issues/7571
func TestRaftFreesReadOnlyMem(t *testing.T) {
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
sm.becomeCandidate()
sm.becomeLeader()
sm.raftLog.commitTo(sm.raftLog.lastIndex())
ctx := []byte("ctx")
// leader starts linearizable read request.
// more info: raft dissertation 6.4, step 2.
sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
msgs := sm.readMessages()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
}
if msgs[0].Type != pb.MsgHeartbeat {
t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
}
if !bytes.Equal(msgs[0].Context, ctx) {
t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
}
if len(sm.readOnly.readIndexQueue) != 1 {
t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 1 {
t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
}
// heartbeat responses from majority of followers (1 in this case)
// acknowledge the authority of the leader.
// more info: raft dissertation 6.4, step 3.
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
if len(sm.readOnly.readIndexQueue) != 0 {
t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
}
if len(sm.readOnly.pendingReadIndex) != 0 {
t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
}
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
}
}
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
// MsgAppResp.
func TestMsgAppRespWaitReset(t *testing.T) {
@ -1856,6 +1905,77 @@ func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
}
}
// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
// when it commits at least one log entry at it term.
func TestReadOnlyForNewLeader(t *testing.T) {
cfg := newTestConfig(1, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 1, Term: 1},
})
cfg.Applied = 1
a := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
b := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
c := newRaft(cfg)
nt := newNetwork(a, b, c)
// Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader.
nt.ignore(pb.MsgApp)
// Force peer a to become leader.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Fatalf("state = %s, want %s", a.state, StateLeader)
}
// Ensure peer a drops read only request.
var windex uint64 = 4
wctx := []byte("ctx")
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 0 {
t.Fatalf("len(readStates) = %d, want zero", len(a.readStates))
}
nt.recover()
// Force peer a to commit a log entry at its term
for i := 0; i < a.heartbeatTimeout; i++ {
a.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if a.raftLog.committed != 4 {
t.Fatalf("committed = %d, want 4", a.raftLog.committed)
}
lastLogTerm := a.raftLog.zeroTermOnErrCompacted(a.raftLog.term(a.raftLog.committed))
if lastLogTerm != a.Term {
t.Fatalf("last log term = %d, want %d", lastLogTerm, a.Term)
}
// Ensure peer a accepts read only request after it commits a entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 1 {
t.Fatalf("len(readStates) = %d, want 1", len(a.readStates))
}
rs := a.readStates[0]
if rs.Index != windex {
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
}
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}
}
func TestLeaderAppResp(t *testing.T) {
// initial progress: match = 0; next = 3
tests := []struct {

View File

@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Context))
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}

View File

@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@ -41,7 +41,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
os.Remove(f.Name())
return n, err
}
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
if fileutil.Exist(fn) {
os.Remove(f.Name())
return n, nil
@ -67,7 +67,7 @@ func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return path.Join(s.dir, fn), nil
return filepath.Join(s.dir, fn), nil
}
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")

View File

@ -21,7 +21,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@ -84,13 +84,13 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
err1 := os.Remove(filepath.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
}
}
return err
@ -114,7 +114,7 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := path.Join(dir, name)
fpath := filepath.Join(dir, name)
snap, err := Read(fpath)
if err != nil {
renameBroken(fpath)

View File

@ -19,7 +19,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@ -38,7 +38,7 @@ var testSnap = &raftpb.Snapshot{
}
func TestSaveAndLoad(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -60,7 +60,7 @@ func TestSaveAndLoad(t *testing.T) {
}
func TestBadCRC(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -76,14 +76,14 @@ func TestBadCRC(t *testing.T) {
// fake a crc mismatch
crcTable = crc32.MakeTable(crc32.Koopman)
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
_, err = Read(filepath.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
if err == nil || err != ErrCRCMismatch {
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
}
}
func TestFailback(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -91,7 +91,7 @@ func TestFailback(t *testing.T) {
defer os.RemoveAll(dir)
large := fmt.Sprintf("%016x-%016x-%016x.snap", 0xFFFF, 0xFFFF, 0xFFFF)
err = ioutil.WriteFile(path.Join(dir, large), []byte("bad data"), 0666)
err = ioutil.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0666)
if err != nil {
t.Fatal(err)
}
@ -109,7 +109,7 @@ func TestFailback(t *testing.T) {
if !reflect.DeepEqual(g, testSnap) {
t.Errorf("snap = %#v, want %#v", g, testSnap)
}
if f, err := os.Open(path.Join(dir, large) + ".broken"); err != nil {
if f, err := os.Open(filepath.Join(dir, large) + ".broken"); err != nil {
t.Fatal("broken snapshot does not exist")
} else {
f.Close()
@ -117,7 +117,7 @@ func TestFailback(t *testing.T) {
}
func TestSnapNames(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -125,7 +125,7 @@ func TestSnapNames(t *testing.T) {
defer os.RemoveAll(dir)
for i := 1; i <= 5; i++ {
var f *os.File
if f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
if f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
t.Fatal(err)
} else {
f.Close()
@ -146,7 +146,7 @@ func TestSnapNames(t *testing.T) {
}
func TestLoadNewestSnap(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -175,7 +175,7 @@ func TestLoadNewestSnap(t *testing.T) {
}
func TestNoSnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
@ -189,19 +189,19 @@ func TestNoSnapshot(t *testing.T) {
}
func TestEmptySnapshot(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte(""), 0x700)
if err != nil {
t.Fatal(err)
}
_, err = Read(path.Join(dir, "1.snap"))
_, err = Read(filepath.Join(dir, "1.snap"))
if err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
}
@ -210,14 +210,14 @@ func TestEmptySnapshot(t *testing.T) {
// TestAllSnapshotBroken ensures snapshotter returns
// ErrNoSnapshot if all the snapshots are broken.
func TestAllSnapshotBroken(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
dir := filepath.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte("bad"), 0x700)
if err != nil {
t.Fatal(err)
}

View File

@ -18,7 +18,7 @@ import (
"flag"
"fmt"
"log"
"path"
"path/filepath"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -58,7 +58,7 @@ func main() {
ss := snap.New(snapDir(*from))
snapshot, err = ss.Load()
} else {
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
snapshot, err = snap.Read(filepath.Join(snapDir(*from), *snapfile))
}
switch err {
@ -132,9 +132,9 @@ func main() {
}
}
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
func snapDir(dataDir string) string { return filepath.Join(dataDir, "member", "snap") }
func parseWALMetadata(b []byte) (id, cid types.ID) {
var metadata etcdserverpb.Metadata

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.0"
Version = "3.1.5"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@ -17,7 +17,7 @@ package wal
import (
"fmt"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@ -65,7 +65,7 @@ func (fp *filePipeline) Close() error {
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}

View File

@ -17,7 +17,7 @@ package wal
import (
"io"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal/walpb"
@ -94,6 +94,6 @@ func openLast(dirpath string) (*fileutil.LockedFile, error) {
if err != nil {
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
last := filepath.Join(dirpath, names[len(names)-1])
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
}

View File

@ -21,7 +21,7 @@ import (
"hash/crc32"
"io"
"os"
"path"
"path/filepath"
"sync"
"time"
@ -97,7 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := path.Clean(dirpath) + ".tmp"
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
@ -107,7 +107,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}
p := path.Join(tmpdirpath, walName(0, 0))
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
@ -143,7 +143,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
return nil, perr
}
@ -196,7 +196,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := path.Join(dirpath, name)
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
@ -232,7 +232,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
@ -372,7 +372,7 @@ func (w *WAL) cut() error {
return err
}
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
@ -464,7 +464,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
if err != nil {
return err
}
@ -611,7 +611,7 @@ func (w *WAL) seq() uint64 {
if t == nil {
return 0
}
seq, _, err := parseWalName(path.Base(t.Name()))
seq, _, err := parseWalName(filepath.Base(t.Name()))
if err != nil {
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
}

View File

@ -19,7 +19,7 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@ -40,7 +40,7 @@ func TestNew(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
defer w.Close()
@ -51,7 +51,7 @@ func TestNew(t *testing.T) {
t.Fatal(err)
}
gd := make([]byte, off)
f, err := os.Open(path.Join(p, path.Base(w.tail().Name())))
f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
if err != nil {
t.Fatal(err)
}
@ -90,7 +90,7 @@ func TestNewForInitedDir(t *testing.T) {
}
defer os.RemoveAll(p)
os.Create(path.Join(p, walName(0, 0)))
os.Create(filepath.Join(p, walName(0, 0)))
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
t.Errorf("err = %v, want %v", err, os.ErrExist)
}
@ -103,7 +103,7 @@ func TestOpenAtIndex(t *testing.T) {
}
defer os.RemoveAll(dir)
f, err := os.Create(path.Join(dir, walName(0, 0)))
f, err := os.Create(filepath.Join(dir, walName(0, 0)))
if err != nil {
t.Fatal(err)
}
@ -113,7 +113,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
}
if w.seq() != 0 {
@ -122,7 +122,7 @@ func TestOpenAtIndex(t *testing.T) {
w.Close()
wname := walName(2, 10)
f, err = os.Create(path.Join(dir, wname))
f, err = os.Create(filepath.Join(dir, wname))
if err != nil {
t.Fatal(err)
}
@ -132,7 +132,7 @@ func TestOpenAtIndex(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %+v, want %+v", g, wname)
}
if w.seq() != 2 {
@ -172,7 +172,7 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname := walName(1, 1)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@ -188,14 +188,14 @@ func TestCut(t *testing.T) {
t.Fatal(err)
}
wname = walName(2, 2)
if g := path.Base(w.tail().Name()); g != wname {
if g := filepath.Base(w.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
// check the state in the last WAL
// We do check before closing the WAL to ensure that Cut syncs the data
// into the disk.
f, err := os.Open(path.Join(p, wname))
f, err := os.Open(filepath.Join(p, wname))
if err != nil {
t.Fatal(err)
}
@ -254,7 +254,7 @@ func TestSaveWithCut(t *testing.T) {
}
defer neww.Close()
wname := walName(1, index)
if g := path.Base(neww.tail().Name()); g != wname {
if g := filepath.Base(neww.tail().Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
}
@ -416,7 +416,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
md.Close()
if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
t.Fatal(err)
}
@ -570,7 +570,7 @@ func TestReleaseLockTo(t *testing.T) {
}
for i, l := range w.locks {
var lockIndex uint64
_, lockIndex, err = parseWalName(path.Base(l.Name()))
_, lockIndex, err = parseWalName(filepath.Base(l.Name()))
if err != nil {
t.Fatal(err)
}
@ -588,7 +588,7 @@ func TestReleaseLockTo(t *testing.T) {
if len(w.locks) != 1 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
}
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
_, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
if err != nil {
t.Fatal(err)
}
@ -673,11 +673,11 @@ func TestRestartCreateWal(t *testing.T) {
defer os.RemoveAll(p)
// make temporary directory so it looks like initialization is interrupted
tmpdir := path.Clean(p) + ".tmp"
tmpdir := filepath.Clean(p) + ".tmp"
if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
t.Fatal(err)
}
if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
t.Fatal(err)
}
@ -729,7 +729,7 @@ func TestOpenOnTornWrite(t *testing.T) {
}
}
fn := path.Join(p, path.Base(w.tail().Name()))
fn := filepath.Join(p, filepath.Base(w.tail().Name()))
w.Close()
// clobber some entry with 0's to simulate a torn write