Compare commits
42 Commits
Author | SHA1 | Date | |
---|---|---|---|
e5b7ee2d03 | |||
a4c5731c38 | |||
1f558ae678 | |||
df93627bbb | |||
a20295c65b | |||
9f7bb0df3a | |||
6a805e5222 | |||
38f79fa565 | |||
37a502cc88 | |||
9be7fc5320 | |||
288bccd288 | |||
8cb5b48f58 | |||
6538217528 | |||
e983d6b343 | |||
20490caaf0 | |||
e156746959 | |||
d84bf983cc | |||
b44c6bff9d | |||
8c3c1b4a9c | |||
b478387a59 | |||
dfc1f21f9d | |||
41e52ebc22 | |||
7bb538d4d4 | |||
1622782e49 | |||
99b47e0c1e | |||
350d0cd211 | |||
72f37ff79a | |||
3221454cab | |||
4a1bffdbc6 | |||
9d9be2bc86 | |||
e5462f74f1 | |||
c68c1d9344 | |||
6ed56cd723 | |||
a3c6f6bf81 | |||
21fdcc6443 | |||
8d122e7011 | |||
ade1d97893 | |||
1300189581 | |||
1971517806 | |||
d614bb0799 | |||
059dc91d4c | |||
5fdbaee761 |
12
.travis.yml
12
.travis.yml
@ -32,18 +32,6 @@ matrix:
|
||||
- go: tip
|
||||
env: TARGET=ppc64le
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- libpcap-dev
|
||||
- libaspell-dev
|
||||
- libhunspell-dev
|
||||
|
||||
before_install:
|
||||
- go get -v github.com/chzchzchz/goword
|
||||
- go get -v honnef.co/go/simple/cmd/gosimple
|
||||
- go get -v honnef.co/go/unused/cmd/unused
|
||||
|
||||
# disable godep restore override
|
||||
install:
|
||||
- pushd cmd/etcd && go get -t -v ./... && popd
|
||||
|
@ -5,6 +5,12 @@ ADD etcdctl /usr/local/bin/
|
||||
RUN mkdir -p /var/etcd/
|
||||
RUN mkdir -p /var/lib/etcd/
|
||||
|
||||
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
|
||||
# but Golang relies on /etc/nsswitch.conf to check the order of DNS resolving
|
||||
# (see https://github.com/golang/go/commit/9dee7771f561cf6aee081c0af6658cc81fac3918)
|
||||
# To fix this we just create /etc/nsswitch.conf and add the following line:
|
||||
RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
|
||||
|
||||
EXPOSE 2379 2380
|
||||
|
||||
# Define default command.
|
||||
|
@ -21,85 +21,92 @@ import (
|
||||
"crypto/rand"
|
||||
"math/big"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
defaultSimpleTokenLength = 16
|
||||
)
|
||||
|
||||
// var for testing purposes
|
||||
var (
|
||||
simpleTokenTTL = 5 * time.Minute
|
||||
simpleTokenTTLResolution = 1 * time.Second
|
||||
)
|
||||
|
||||
type simpleTokenTTLKeeper struct {
|
||||
tokens map[string]time.Time
|
||||
addSimpleTokenCh chan string
|
||||
resetSimpleTokenCh chan string
|
||||
deleteSimpleTokenCh chan string
|
||||
stopCh chan chan struct{}
|
||||
deleteTokenFunc func(string)
|
||||
}
|
||||
|
||||
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
|
||||
stk := &simpleTokenTTLKeeper{
|
||||
tokens: make(map[string]time.Time),
|
||||
addSimpleTokenCh: make(chan string, 1),
|
||||
resetSimpleTokenCh: make(chan string, 1),
|
||||
deleteSimpleTokenCh: make(chan string, 1),
|
||||
stopCh: make(chan chan struct{}),
|
||||
deleteTokenFunc: deletefunc,
|
||||
}
|
||||
go stk.run()
|
||||
return stk
|
||||
tokens map[string]time.Time
|
||||
donec chan struct{}
|
||||
stopc chan struct{}
|
||||
deleteTokenFunc func(string)
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) stop() {
|
||||
waitCh := make(chan struct{})
|
||||
tm.stopCh <- waitCh
|
||||
<-waitCh
|
||||
close(tm.stopCh)
|
||||
select {
|
||||
case tm.stopc <- struct{}{}:
|
||||
case <-tm.donec:
|
||||
}
|
||||
<-tm.donec
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
|
||||
tm.addSimpleTokenCh <- token
|
||||
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
|
||||
tm.resetSimpleTokenCh <- token
|
||||
if _, ok := tm.tokens[token]; ok {
|
||||
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
|
||||
tm.deleteSimpleTokenCh <- token
|
||||
delete(tm.tokens, token)
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) run() {
|
||||
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
|
||||
defer tokenTicker.Stop()
|
||||
defer func() {
|
||||
tokenTicker.Stop()
|
||||
close(tm.donec)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case t := <-tm.addSimpleTokenCh:
|
||||
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
|
||||
case t := <-tm.resetSimpleTokenCh:
|
||||
if _, ok := tm.tokens[t]; ok {
|
||||
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
|
||||
}
|
||||
case t := <-tm.deleteSimpleTokenCh:
|
||||
delete(tm.tokens, t)
|
||||
case <-tokenTicker.C:
|
||||
nowtime := time.Now()
|
||||
tm.mu.Lock()
|
||||
for t, tokenendtime := range tm.tokens {
|
||||
if nowtime.After(tokenendtime) {
|
||||
tm.deleteTokenFunc(t)
|
||||
delete(tm.tokens, t)
|
||||
}
|
||||
}
|
||||
case waitCh := <-tm.stopCh:
|
||||
tm.tokens = make(map[string]time.Time)
|
||||
waitCh <- struct{}{}
|
||||
tm.mu.Unlock()
|
||||
case <-tm.stopc:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (as *authStore) enable() {
|
||||
delf := func(tk string) {
|
||||
if username, ok := as.simpleTokens[tk]; ok {
|
||||
plog.Infof("deleting token %s for user %s", tk, username)
|
||||
delete(as.simpleTokens, tk)
|
||||
}
|
||||
}
|
||||
as.simpleTokenKeeper = &simpleTokenTTLKeeper{
|
||||
tokens: make(map[string]time.Time),
|
||||
donec: make(chan struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
deleteTokenFunc: delf,
|
||||
mu: &as.simpleTokensMu,
|
||||
}
|
||||
go as.simpleTokenKeeper.run()
|
||||
}
|
||||
|
||||
func (as *authStore) GenSimpleToken() (string, error) {
|
||||
ret := make([]byte, defaultSimpleTokenLength)
|
||||
|
||||
@ -117,7 +124,6 @@ func (as *authStore) GenSimpleToken() (string, error) {
|
||||
|
||||
func (as *authStore) assignSimpleTokenToUser(username, token string) {
|
||||
as.simpleTokensMu.Lock()
|
||||
|
||||
_, ok := as.simpleTokens[token]
|
||||
if ok {
|
||||
plog.Panicf("token %s is alredy used", token)
|
||||
@ -129,13 +135,15 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
|
||||
}
|
||||
|
||||
func (as *authStore) invalidateUser(username string) {
|
||||
if as.simpleTokenKeeper == nil {
|
||||
return
|
||||
}
|
||||
as.simpleTokensMu.Lock()
|
||||
defer as.simpleTokensMu.Unlock()
|
||||
|
||||
for token, name := range as.simpleTokens {
|
||||
if strings.Compare(name, username) == 0 {
|
||||
delete(as.simpleTokens, token)
|
||||
as.simpleTokenKeeper.deleteSimpleToken(token)
|
||||
}
|
||||
}
|
||||
as.simpleTokensMu.Unlock()
|
||||
}
|
||||
|
@ -168,13 +168,13 @@ type authStore struct {
|
||||
|
||||
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
|
||||
|
||||
simpleTokensMu sync.RWMutex
|
||||
simpleTokens map[string]string // token -> username
|
||||
simpleTokenKeeper *simpleTokenTTLKeeper
|
||||
|
||||
revision uint64
|
||||
|
||||
indexWaiter func(uint64) <-chan struct{}
|
||||
// tokenSimple in v3.2+
|
||||
indexWaiter func(uint64) <-chan struct{}
|
||||
simpleTokenKeeper *simpleTokenTTLKeeper
|
||||
simpleTokensMu sync.Mutex
|
||||
simpleTokens map[string]string // token -> username
|
||||
}
|
||||
|
||||
func newDeleterFunc(as *authStore) func(string) {
|
||||
@ -215,8 +215,7 @@ func (as *authStore) AuthEnable() error {
|
||||
tx.UnsafePut(authBucketName, enableFlagKey, authEnabled)
|
||||
|
||||
as.enabled = true
|
||||
|
||||
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
|
||||
as.enable()
|
||||
|
||||
as.rangePermCache = make(map[string]*unifiedRangePermissions)
|
||||
|
||||
@ -244,11 +243,12 @@ func (as *authStore) AuthDisable() {
|
||||
as.enabled = false
|
||||
|
||||
as.simpleTokensMu.Lock()
|
||||
tk := as.simpleTokenKeeper
|
||||
as.simpleTokenKeeper = nil
|
||||
as.simpleTokens = make(map[string]string) // invalidate all tokens
|
||||
as.simpleTokensMu.Unlock()
|
||||
if as.simpleTokenKeeper != nil {
|
||||
as.simpleTokenKeeper.stop()
|
||||
as.simpleTokenKeeper = nil
|
||||
if tk != nil {
|
||||
tk.stop()
|
||||
}
|
||||
|
||||
plog.Noticef("Authentication disabled")
|
||||
@ -646,13 +646,14 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
|
||||
}
|
||||
|
||||
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
|
||||
as.simpleTokensMu.RLock()
|
||||
defer as.simpleTokensMu.RUnlock()
|
||||
t, ok := as.simpleTokens[token]
|
||||
if ok {
|
||||
// same as '(t *tokenSimple) info' in v3.2+
|
||||
as.simpleTokensMu.Lock()
|
||||
username, ok := as.simpleTokens[token]
|
||||
if ok && as.simpleTokenKeeper != nil {
|
||||
as.simpleTokenKeeper.resetSimpleToken(token)
|
||||
}
|
||||
return &AuthInfo{Username: t, Revision: as.revision}, ok
|
||||
as.simpleTokensMu.Unlock()
|
||||
return &AuthInfo{Username: username, Revision: as.revision}, ok
|
||||
}
|
||||
|
||||
type permSlice []*authpb.Permission
|
||||
@ -764,6 +765,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
if !as.isAuthEnabled() {
|
||||
return nil
|
||||
}
|
||||
if authInfo == nil {
|
||||
return ErrUserEmpty
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
@ -908,7 +912,7 @@ func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{})
|
||||
}
|
||||
|
||||
if enabled {
|
||||
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
|
||||
as.enable()
|
||||
}
|
||||
|
||||
if as.revision == 0 {
|
||||
|
@ -282,8 +282,16 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
|
||||
tokenMu: &sync.RWMutex{},
|
||||
}
|
||||
|
||||
err := c.getToken(context.TODO())
|
||||
if err != nil {
|
||||
ctx := c.ctx
|
||||
if c.cfg.DialTimeout > 0 {
|
||||
cctx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
|
||||
defer cancel()
|
||||
ctx = cctx
|
||||
}
|
||||
if err := c.getToken(ctx); err != nil {
|
||||
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
|
||||
err = grpc.ErrClientConnTimeout
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -335,6 +343,8 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
client.balancer = newSimpleBalancer(cfg.Endpoints)
|
||||
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
|
||||
if err != nil {
|
||||
client.cancel()
|
||||
client.balancer.Close()
|
||||
return nil, err
|
||||
}
|
||||
client.conn = conn
|
||||
@ -353,6 +363,7 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
}
|
||||
if !hasConn {
|
||||
client.cancel()
|
||||
client.balancer.Close()
|
||||
conn.Close()
|
||||
return nil, grpc.ErrClientConnTimeout
|
||||
}
|
||||
|
@ -70,33 +70,45 @@ func TestDialCancel(t *testing.T) {
|
||||
func TestDialTimeout(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
donec := make(chan error)
|
||||
go func() {
|
||||
// without timeout, grpc keeps redialing if connection refused
|
||||
cfg := Config{
|
||||
Endpoints: []string{"localhost:12345"},
|
||||
DialTimeout: 2 * time.Second}
|
||||
c, err := New(cfg)
|
||||
if c != nil || err == nil {
|
||||
t.Errorf("new client should fail")
|
||||
}
|
||||
donec <- err
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case err := <-donec:
|
||||
t.Errorf("dial didn't wait (%v)", err)
|
||||
default:
|
||||
testCfgs := []Config{
|
||||
{
|
||||
Endpoints: []string{"http://254.0.0.1:12345"},
|
||||
DialTimeout: 2 * time.Second,
|
||||
},
|
||||
{
|
||||
Endpoints: []string{"http://254.0.0.1:12345"},
|
||||
DialTimeout: time.Second,
|
||||
Username: "abc",
|
||||
Password: "def",
|
||||
},
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Errorf("failed to timeout dial on time")
|
||||
case err := <-donec:
|
||||
if err != grpc.ErrClientConnTimeout {
|
||||
t.Errorf("unexpected error %v, want %v", err, grpc.ErrClientConnTimeout)
|
||||
for i, cfg := range testCfgs {
|
||||
donec := make(chan error)
|
||||
go func() {
|
||||
// without timeout, dial continues forever on ipv4 blackhole
|
||||
c, err := New(cfg)
|
||||
if c != nil || err == nil {
|
||||
t.Errorf("#%d: new client should fail", i)
|
||||
}
|
||||
donec <- err
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case err := <-donec:
|
||||
t.Errorf("#%d: dial didn't wait (%v)", i, err)
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Errorf("#%d: failed to timeout dial on time", i)
|
||||
case err := <-donec:
|
||||
if err != grpc.ErrClientConnTimeout {
|
||||
t.Errorf("#%d: unexpected error %v, want %v", i, err, grpc.ErrClientConnTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,6 +132,8 @@ type watchGrpcStream struct {
|
||||
errc chan error
|
||||
// closingc gets the watcherStream of closing watchers
|
||||
closingc chan *watcherStream
|
||||
// wg is Done when all substream goroutines have exited
|
||||
wg sync.WaitGroup
|
||||
|
||||
// resumec closes to signal that all substreams should begin resuming
|
||||
resumec chan struct{}
|
||||
@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() {
|
||||
for range closing {
|
||||
w.closeSubstream(<-w.closingc)
|
||||
}
|
||||
|
||||
w.wg.Wait()
|
||||
w.owner.closeStream(w)
|
||||
}()
|
||||
|
||||
@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() {
|
||||
}
|
||||
|
||||
ws.donec = make(chan struct{})
|
||||
w.wg.Add(1)
|
||||
go w.serveSubstream(ws, w.resumec)
|
||||
|
||||
// queue up for watcher creation/resume
|
||||
@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
||||
if !resuming {
|
||||
w.closingc <- ws
|
||||
}
|
||||
w.wg.Done()
|
||||
}()
|
||||
|
||||
emptyWr := &WatchResponse{}
|
||||
@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
continue
|
||||
}
|
||||
ws.donec = make(chan struct{})
|
||||
w.wg.Add(1)
|
||||
go w.serveSubstream(ws, w.resumec)
|
||||
}
|
||||
|
||||
@ -694,6 +699,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
|
||||
go func(ws *watcherStream) {
|
||||
defer wg.Done()
|
||||
if ws.closing {
|
||||
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
|
||||
close(ws.outc)
|
||||
ws.outc = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
select {
|
||||
|
@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
shortHost := strings.TrimSuffix(srv.Target, ".")
|
||||
urlHost := net.JoinHostPort(shortHost, port)
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
|
||||
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
|
||||
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
|
||||
if ok && url.Scheme != scheme {
|
||||
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
|
||||
}
|
||||
|
@ -55,20 +55,12 @@ var (
|
||||
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
|
||||
DefaultAdvertiseClientURLs = "http://localhost:2379"
|
||||
|
||||
defaultHostname string = "localhost"
|
||||
defaultHostname string
|
||||
defaultHostStatus error
|
||||
)
|
||||
|
||||
func init() {
|
||||
ip, err := netutil.GetDefaultHost()
|
||||
if err != nil {
|
||||
defaultHostStatus = err
|
||||
return
|
||||
}
|
||||
// found default host, advertise on it
|
||||
DefaultInitialAdvertisePeerURLs = "http://" + net.JoinHostPort(ip, "2380")
|
||||
DefaultAdvertiseClientURLs = "http://" + net.JoinHostPort(ip, "2379")
|
||||
defaultHostname = ip
|
||||
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
|
||||
}
|
||||
|
||||
// Config holds the arguments for configuring an etcd server.
|
||||
@ -237,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
|
||||
cfg.ACUrls = []url.URL(u)
|
||||
}
|
||||
|
||||
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
|
||||
cfg.InitialCluster = ""
|
||||
}
|
||||
if cfg.ClusterState == "" {
|
||||
cfg.ClusterState = ClusterStateFlagNew
|
||||
}
|
||||
@ -346,34 +341,52 @@ func (cfg Config) InitialClusterFromName(name string) (ret string) {
|
||||
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
|
||||
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||
|
||||
// IsDefaultHost returns the default hostname, if used, and the error, if any,
|
||||
// from getting the machine's default host.
|
||||
func (cfg Config) IsDefaultHost() (string, error) {
|
||||
if len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs {
|
||||
return defaultHostname, defaultHostStatus
|
||||
}
|
||||
if len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs {
|
||||
return defaultHostname, defaultHostStatus
|
||||
}
|
||||
return "", defaultHostStatus
|
||||
func (cfg Config) defaultPeerHost() bool {
|
||||
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
|
||||
}
|
||||
|
||||
// UpdateDefaultClusterFromName updates cluster advertise URLs with default host.
|
||||
func (cfg Config) defaultClientHost() bool {
|
||||
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
|
||||
}
|
||||
|
||||
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
|
||||
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
|
||||
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
|
||||
// then the advertise peer host would be updated with machine's default host,
|
||||
// while keeping the listen URL's port.
|
||||
// User can work around this by explicitly setting URL with 127.0.0.1.
|
||||
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
|
||||
// TODO: check whether fields are set instead of whether fields have default value
|
||||
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) {
|
||||
defaultHost, defaultHostErr := cfg.IsDefaultHost()
|
||||
defaultHostOverride := defaultHost == "" || defaultHostErr == nil
|
||||
if (defaultHostOverride || cfg.Name != DefaultName) && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
ip, _, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
|
||||
// if client-listen-url is 0.0.0.0, just use detected default host
|
||||
// otherwise, rewrite advertise-client-url with localhost
|
||||
if ip != "0.0.0.0" {
|
||||
_, acPort, _ := net.SplitHostPort(cfg.ACUrls[0].Host)
|
||||
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("localhost:%s", acPort)}
|
||||
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
|
||||
if defaultHostname == "" || defaultHostStatus != nil {
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
return "", defaultHostStatus
|
||||
}
|
||||
|
||||
used := false
|
||||
pip, pport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
|
||||
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
|
||||
used = true
|
||||
}
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
|
||||
cip, cport, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
|
||||
if cfg.defaultClientHost() && cip == "0.0.0.0" {
|
||||
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
|
||||
used = true
|
||||
}
|
||||
dhost := defaultHostname
|
||||
if !used {
|
||||
dhost = ""
|
||||
}
|
||||
return dhost, defaultHostStatus
|
||||
}
|
||||
|
||||
// checkBindURLs returns an error if any URL uses a domain name.
|
||||
|
@ -15,11 +15,15 @@
|
||||
package embed
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
)
|
||||
|
||||
@ -61,6 +65,70 @@ func TestConfigFileOtherFields(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
|
||||
func TestUpdateDefaultClusterFromName(t *testing.T) {
|
||||
cfg := NewConfig()
|
||||
defaultInitialCluster := cfg.InitialCluster
|
||||
oldscheme := cfg.APUrls[0].Scheme
|
||||
origpeer := cfg.APUrls[0].String()
|
||||
origadvc := cfg.ACUrls[0].String()
|
||||
|
||||
cfg.Name = "abc"
|
||||
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
|
||||
// in case of 'etcd --name=abc'
|
||||
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
|
||||
cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if exp != cfg.InitialCluster {
|
||||
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
|
||||
}
|
||||
// advertise peer URL should not be affected
|
||||
if origpeer != cfg.APUrls[0].String() {
|
||||
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
|
||||
}
|
||||
// advertise client URL should not be affected
|
||||
if origadvc != cfg.ACUrls[0].String() {
|
||||
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateDefaultClusterFromNameOverwrite ensures that machine's default host is only used
|
||||
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
|
||||
func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
|
||||
if defaultHostname == "" {
|
||||
t.Skip("machine's default host not found")
|
||||
}
|
||||
|
||||
cfg := NewConfig()
|
||||
defaultInitialCluster := cfg.InitialCluster
|
||||
oldscheme := cfg.APUrls[0].Scheme
|
||||
origadvc := cfg.ACUrls[0].String()
|
||||
|
||||
cfg.Name = "abc"
|
||||
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
|
||||
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if dhost != defaultHostname {
|
||||
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
|
||||
}
|
||||
aphost, apport, _ := net.SplitHostPort(cfg.APUrls[0].Host)
|
||||
if apport != lpport {
|
||||
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
|
||||
}
|
||||
if aphost != defaultHostname {
|
||||
t.Fatalf("advertise peer url expected machine default host %q, got %q", defaultHostname, aphost)
|
||||
}
|
||||
expected := fmt.Sprintf("%s=%s://%s:%s", cfg.Name, oldscheme, defaultHostname, lpport)
|
||||
if expected != cfg.InitialCluster {
|
||||
t.Fatalf("initial-cluster expected %q, got %q", expected, cfg.InitialCluster)
|
||||
}
|
||||
|
||||
// advertise client URL should not be affected
|
||||
if origadvc != cfg.ACUrls[0].String() {
|
||||
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
|
||||
return s.CAFile == t.CAFile &&
|
||||
s.CertFile == t.CertFile &&
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
@ -166,7 +166,7 @@ func startPeerListeners(cfg *Config) (plns []net.Listener, err error) {
|
||||
for i, u := range cfg.LPUrls {
|
||||
phosts[i] = u.Host
|
||||
}
|
||||
cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts)
|
||||
cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
|
||||
if err != nil {
|
||||
plog.Fatalf("could not get certs (%v)", err)
|
||||
}
|
||||
@ -221,7 +221,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
|
||||
for i, u := range cfg.LCUrls {
|
||||
chosts[i] = u.Host
|
||||
}
|
||||
cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts)
|
||||
cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
|
||||
if err != nil {
|
||||
plog.Fatalf("could not get certs (%v)", err)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
package embed
|
||||
|
||||
import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/wal"
|
||||
)
|
||||
@ -23,7 +23,7 @@ import (
|
||||
func isMemberInitialized(cfg *Config) bool {
|
||||
waldir := cfg.WalDir
|
||||
if waldir == "" {
|
||||
waldir = path.Join(cfg.Dir, "member", "wal")
|
||||
waldir = filepath.Join(cfg.Dir, "member", "wal")
|
||||
}
|
||||
|
||||
return wal.Exist(waldir)
|
||||
|
@ -17,7 +17,7 @@ package command
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -50,19 +50,19 @@ func handleBackup(c *cli.Context) error {
|
||||
var srcWAL string
|
||||
var destWAL string
|
||||
|
||||
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
|
||||
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
|
||||
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
|
||||
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
|
||||
|
||||
if c.String("wal-dir") != "" {
|
||||
srcWAL = c.String("wal-dir")
|
||||
} else {
|
||||
srcWAL = path.Join(c.String("data-dir"), "member", "wal")
|
||||
srcWAL = filepath.Join(c.String("data-dir"), "member", "wal")
|
||||
}
|
||||
|
||||
if c.String("backup-wal-dir") != "" {
|
||||
destWAL = c.String("backup-wal-dir")
|
||||
} else {
|
||||
destWAL = path.Join(c.String("backup-dir"), "member", "wal")
|
||||
destWAL = filepath.Join(c.String("backup-dir"), "member", "wal")
|
||||
}
|
||||
|
||||
if err := fileutil.CreateDirAll(destSnap); err != nil {
|
||||
|
@ -125,18 +125,19 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
||||
return rpctypes.ErrCompacted
|
||||
}
|
||||
|
||||
var rev int64
|
||||
var lastRev int64
|
||||
ops := []clientv3.Op{}
|
||||
|
||||
for _, ev := range wr.Events {
|
||||
nrev := ev.Kv.ModRevision
|
||||
if rev != 0 && nrev > rev {
|
||||
nextRev := ev.Kv.ModRevision
|
||||
if lastRev != 0 && nextRev > lastRev {
|
||||
_, err := dc.Txn(ctx).Then(ops...).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ops = []clientv3.Op{}
|
||||
}
|
||||
lastRev = nextRev
|
||||
switch ev.Type {
|
||||
case mvccpb.PUT:
|
||||
ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))
|
||||
|
@ -107,7 +107,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
urls := strings.Split(memberPeerURLs, ",")
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
|
||||
cli := mustClientFromCmd(cmd)
|
||||
resp, err := cli.MemberAdd(ctx, urls)
|
||||
cancel()
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
@ -118,12 +119,24 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
if _, ok := (display).(*simplePrinter); ok {
|
||||
ctx, cancel = commandCtx(cmd)
|
||||
listResp, err := mustClientFromCmd(cmd).MemberList(ctx)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
listResp, err := cli.MemberList(ctx)
|
||||
// get latest member list; if there's failover new member might have outdated list
|
||||
for {
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
if listResp.Header.MemberId == resp.Header.MemberId {
|
||||
break
|
||||
}
|
||||
// quorum get to sync cluster list
|
||||
gresp, gerr := cli.Get(ctx, "_")
|
||||
if gerr != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
resp.Header.MemberId = gresp.Header.MemberId
|
||||
listResp, err = cli.MemberList(ctx)
|
||||
}
|
||||
cancel()
|
||||
|
||||
conf := []string{}
|
||||
for _, memb := range listResp.Members {
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
@ -103,7 +103,7 @@ func prepareBackend() backend.Backend {
|
||||
var be backend.Backend
|
||||
|
||||
bch := make(chan struct{})
|
||||
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
|
||||
dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
|
||||
go func() {
|
||||
defer close(bch)
|
||||
be = backend.New(dbpath, time.Second, 10000)
|
||||
@ -130,9 +130,9 @@ func rebuildStoreV2() (store.Store, uint64) {
|
||||
|
||||
waldir := migrateWALdir
|
||||
if len(waldir) == 0 {
|
||||
waldir = path.Join(migrateDatadir, "member", "wal")
|
||||
waldir = filepath.Join(migrateDatadir, "member", "wal")
|
||||
}
|
||||
snapdir := path.Join(migrateDatadir, "member", "snap")
|
||||
snapdir := filepath.Join(migrateDatadir, "member", "snap")
|
||||
|
||||
ss := snap.New(snapdir)
|
||||
snapshot, err := ss.Load()
|
||||
|
@ -30,7 +30,7 @@ func (p *fieldsPrinter) kv(pfx string, kv *spb.KeyValue) {
|
||||
fmt.Printf("\"%sModRevision\" : %d\n", pfx, kv.ModRevision)
|
||||
fmt.Printf("\"%sVersion\" : %d\n", pfx, kv.Version)
|
||||
fmt.Printf("\"%sValue\" : %q\n", pfx, string(kv.Value))
|
||||
fmt.Printf("\"%sLease\" : %d\n", pfx, string(kv.Lease))
|
||||
fmt.Printf("\"%sLease\" : %d\n", pfx, kv.Lease)
|
||||
}
|
||||
|
||||
func (p *fieldsPrinter) hdr(h *pb.ResponseHeader) {
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
@ -186,8 +186,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
||||
basedir = restoreName + ".etcd"
|
||||
}
|
||||
|
||||
waldir := path.Join(basedir, "member", "wal")
|
||||
snapdir := path.Join(basedir, "member", "snap")
|
||||
waldir := filepath.Join(basedir, "member", "wal")
|
||||
snapdir := filepath.Join(basedir, "member", "snap")
|
||||
|
||||
if _, err := os.Stat(basedir); err == nil {
|
||||
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
||||
@ -325,7 +325,7 @@ func makeDB(snapdir, dbfile string, commit int) {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
|
||||
dbpath := path.Join(snapdir, "db")
|
||||
dbpath := filepath.Join(snapdir, "db")
|
||||
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
if dberr != nil {
|
||||
ExitWithError(ExitIO, dberr)
|
||||
|
@ -45,7 +45,7 @@ var (
|
||||
func init() {
|
||||
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
|
||||
|
||||
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
|
||||
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
|
||||
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
|
||||
|
||||
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
@ -39,8 +39,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy/httpproxy"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/go-systemd/daemon"
|
||||
systemdutil "github.com/coreos/go-systemd/util"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -85,7 +83,13 @@ func startEtcdOrProxyV2() {
|
||||
GoMaxProcs := runtime.GOMAXPROCS(0)
|
||||
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
|
||||
|
||||
(&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
defaultHost, dhErr := (&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if defaultHost != "" {
|
||||
plog.Infof("advertising using detected default host %q", defaultHost)
|
||||
}
|
||||
if dhErr != nil {
|
||||
plog.Noticef("failed to detect default host (%v)", dhErr)
|
||||
}
|
||||
|
||||
if cfg.Dir == "" {
|
||||
cfg.Dir = fmt.Sprintf("%v.etcd", cfg.Name)
|
||||
@ -157,20 +161,12 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
osutil.HandleInterrupts()
|
||||
|
||||
if systemdutil.IsRunningSystemd() {
|
||||
// At this point, the initialization of etcd is done.
|
||||
// The listeners are listening on the TCP ports and ready
|
||||
// for accepting connections. The etcd instance should be
|
||||
// joined with the cluster and ready to serve incoming
|
||||
// connections.
|
||||
sent, err := daemon.SdNotify(false, "READY=1")
|
||||
if err != nil {
|
||||
plog.Errorf("failed to notify systemd for readiness: %v", err)
|
||||
}
|
||||
if !sent {
|
||||
plog.Errorf("forgot to set Type=notify in systemd service file?")
|
||||
}
|
||||
}
|
||||
// At this point, the initialization of etcd is done.
|
||||
// The listeners are listening on the TCP ports and ready
|
||||
// for accepting connections. The etcd instance should be
|
||||
// joined with the cluster and ready to serve incoming
|
||||
// connections.
|
||||
notifySystemd()
|
||||
|
||||
select {
|
||||
case lerr := <-errc:
|
||||
@ -184,15 +180,6 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
||||
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
defaultHost, dhErr := cfg.IsDefaultHost()
|
||||
if defaultHost != "" {
|
||||
if dhErr == nil {
|
||||
plog.Infof("advertising using detected default host %q", defaultHost)
|
||||
} else {
|
||||
plog.Noticef("failed to detect default host, advertise falling back to %q (%v)", defaultHost, dhErr)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
@ -202,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
osutil.RegisterInterruptHandler(e.Server.Stop)
|
||||
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
|
||||
select {
|
||||
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
|
||||
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
|
||||
}
|
||||
return e.Server.StopNotify(), e.Err(), nil
|
||||
}
|
||||
|
||||
@ -221,14 +211,14 @@ func startProxy(cfg *config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.Dir = path.Join(cfg.Dir, "proxy")
|
||||
cfg.Dir = filepath.Join(cfg.Dir, "proxy")
|
||||
err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var peerURLs []string
|
||||
clusterfile := path.Join(cfg.Dir, "cluster")
|
||||
clusterfile := filepath.Join(cfg.Dir, "cluster")
|
||||
|
||||
b, err := ioutil.ReadFile(clusterfile)
|
||||
switch {
|
||||
|
@ -17,12 +17,14 @@ package etcdmain
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/proxy/tcpproxy"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@ -77,6 +79,20 @@ func newGatewayStartCommand() *cobra.Command {
|
||||
return &cmd
|
||||
}
|
||||
|
||||
func stripSchema(eps []string) []string {
|
||||
var endpoints []string
|
||||
|
||||
for _, ep := range eps {
|
||||
|
||||
if u, err := url.Parse(ep); err == nil && u.Host != "" {
|
||||
ep = u.Host
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, ep)
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
func startGateway(cmd *cobra.Command, args []string) {
|
||||
endpoints := gatewayEndpoints
|
||||
if gatewayDNSCluster != "" {
|
||||
@ -101,6 +117,9 @@ func startGateway(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Strip the schema from the endpoints because we start just a TCP proxy
|
||||
endpoints = stripSchema(endpoints)
|
||||
|
||||
if len(endpoints) == 0 {
|
||||
plog.Fatalf("no endpoints found")
|
||||
}
|
||||
@ -117,5 +136,8 @@ func startGateway(cmd *cobra.Command, args []string) {
|
||||
MonitorInterval: getewayRetryDelay,
|
||||
}
|
||||
|
||||
// At this point, etcd gateway listener is initialized
|
||||
notifySystemd()
|
||||
|
||||
tp.Run()
|
||||
}
|
||||
|
@ -144,6 +144,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
|
||||
go func() { errc <- m.Serve() }()
|
||||
|
||||
// grpc-proxy is initialized, ready to serve
|
||||
notifySystemd()
|
||||
|
||||
fmt.Fprintln(os.Stderr, <-errc)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -17,6 +17,9 @@ package etcdmain
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/go-systemd/daemon"
|
||||
systemdutil "github.com/coreos/go-systemd/util"
|
||||
)
|
||||
|
||||
func Main() {
|
||||
@ -35,3 +38,16 @@ func Main() {
|
||||
|
||||
startEtcdOrProxyV2()
|
||||
}
|
||||
|
||||
func notifySystemd() {
|
||||
if !systemdutil.IsRunningSystemd() {
|
||||
return
|
||||
}
|
||||
sent, err := daemon.SdNotify(false, "READY=1")
|
||||
if err != nil {
|
||||
plog.Errorf("failed to notify systemd for readiness: %v", err)
|
||||
}
|
||||
if !sent {
|
||||
plog.Errorf("forgot to set Type=notify in systemd service file?")
|
||||
}
|
||||
}
|
||||
|
@ -185,9 +185,5 @@ func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (
|
||||
}
|
||||
|
||||
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
|
||||
if err := ams.isAuthenticated(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ams.maintenanceServer.Status(ctx, ar)
|
||||
}
|
||||
|
@ -520,15 +520,14 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR
|
||||
if err == nil {
|
||||
resp.ID = int64(l.ID)
|
||||
resp.TTL = l.TTL()
|
||||
resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()}
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
|
||||
return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err
|
||||
return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
||||
@ -609,69 +608,125 @@ func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.AuthEnableResponse{}, nil
|
||||
return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
|
||||
}
|
||||
|
||||
func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
|
||||
a.s.AuthStore().AuthDisable()
|
||||
return &pb.AuthDisableResponse{}, nil
|
||||
return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
|
||||
ctx := context.WithValue(context.WithValue(context.TODO(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
|
||||
return a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
|
||||
ctx := context.WithValue(context.WithValue(context.Background(), "index", a.s.consistIndex.ConsistentIndex()), "simpleToken", r.SimpleToken)
|
||||
resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
|
||||
return a.s.AuthStore().UserAdd(r)
|
||||
resp, err := a.s.AuthStore().UserAdd(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
|
||||
return a.s.AuthStore().UserDelete(r)
|
||||
resp, err := a.s.AuthStore().UserDelete(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
|
||||
return a.s.AuthStore().UserChangePassword(r)
|
||||
resp, err := a.s.AuthStore().UserChangePassword(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
|
||||
return a.s.AuthStore().UserGrantRole(r)
|
||||
resp, err := a.s.AuthStore().UserGrantRole(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
||||
return a.s.AuthStore().UserGet(r)
|
||||
resp, err := a.s.AuthStore().UserGet(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
|
||||
return a.s.AuthStore().UserRevokeRole(r)
|
||||
resp, err := a.s.AuthStore().UserRevokeRole(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
|
||||
return a.s.AuthStore().RoleAdd(r)
|
||||
resp, err := a.s.AuthStore().RoleAdd(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
|
||||
return a.s.AuthStore().RoleGrantPermission(r)
|
||||
resp, err := a.s.AuthStore().RoleGrantPermission(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
|
||||
return a.s.AuthStore().RoleGet(r)
|
||||
resp, err := a.s.AuthStore().RoleGet(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
|
||||
return a.s.AuthStore().RoleRevokePermission(r)
|
||||
resp, err := a.s.AuthStore().RoleRevokePermission(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
|
||||
return a.s.AuthStore().RoleDelete(r)
|
||||
resp, err := a.s.AuthStore().RoleDelete(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
|
||||
return a.s.AuthStore().UserList(r)
|
||||
resp, err := a.s.AuthStore().UserList(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
|
||||
return a.s.AuthStore().RoleList(r)
|
||||
resp, err := a.s.AuthStore().RoleList(r)
|
||||
if resp != nil {
|
||||
resp.Header = newHeader(a.s)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type quotaApplierV3 struct {
|
||||
@ -836,3 +891,12 @@ func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
|
||||
}
|
||||
rr.KVs = rr.KVs[:j]
|
||||
}
|
||||
|
||||
func newHeader(s *EtcdServer) *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{
|
||||
ClusterId: uint64(s.Cluster().ID()),
|
||||
MemberId: uint64(s.ID()),
|
||||
Revision: s.KV().Rev(),
|
||||
RaftTerm: s.Term(),
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@ -118,16 +118,16 @@ func (c *ServerConfig) advertiseMatchesCluster() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
|
||||
func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
|
||||
|
||||
func (c *ServerConfig) WALDir() string {
|
||||
if c.DedicatedWALDir != "" {
|
||||
return c.DedicatedWALDir
|
||||
}
|
||||
return path.Join(c.MemberDir(), "wal")
|
||||
return filepath.Join(c.MemberDir(), "wal")
|
||||
}
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
|
||||
func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -263,7 +264,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
bepath := path.Join(cfg.SnapDir(), databaseFilename)
|
||||
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
|
||||
beExist := fileutil.Exist(bepath)
|
||||
|
||||
var be backend.Backend
|
||||
@ -594,6 +595,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
|
||||
type etcdProgress struct {
|
||||
confState raftpb.ConfState
|
||||
snapi uint64
|
||||
appliedt uint64
|
||||
appliedi uint64
|
||||
}
|
||||
|
||||
@ -666,6 +668,7 @@ func (s *EtcdServer) run() {
|
||||
ep := etcdProgress{
|
||||
confState: snap.Metadata.ConfState,
|
||||
snapi: snap.Metadata.Index,
|
||||
appliedt: snap.Metadata.Term,
|
||||
appliedi: snap.Metadata.Index,
|
||||
}
|
||||
|
||||
@ -765,7 +768,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
select {
|
||||
// snapshot requested via send()
|
||||
case m := <-s.r.msgSnapC:
|
||||
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
|
||||
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
|
||||
s.sendMergedSnap(merged)
|
||||
default:
|
||||
}
|
||||
@ -789,7 +792,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
plog.Panicf("get database snapshot file path error: %v", err)
|
||||
}
|
||||
|
||||
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
|
||||
fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
|
||||
if err := os.Rename(snapfn, fn); err != nil {
|
||||
plog.Panicf("rename snapshot file error: %v", err)
|
||||
}
|
||||
@ -867,6 +870,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
plog.Info("finished adding peers from new cluster configuration into network...")
|
||||
|
||||
ep.appliedt = apply.snapshot.Metadata.Term
|
||||
ep.appliedi = apply.snapshot.Metadata.Index
|
||||
ep.snapi = ep.appliedi
|
||||
ep.confState = apply.snapshot.Metadata.ConfState
|
||||
@ -888,7 +892,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
|
||||
return
|
||||
}
|
||||
var shouldstop bool
|
||||
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
||||
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
||||
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
||||
}
|
||||
}
|
||||
@ -1242,9 +1246,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
// apply takes entries received from Raft (after it has been committed) and
|
||||
// applies them to the current state of the EtcdServer.
|
||||
// The given entries should not be empty.
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
|
||||
var applied uint64
|
||||
var shouldstop bool
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
switch e.Type {
|
||||
@ -1254,16 +1256,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
removedSelf, err := s.applyConfChange(cc, confState)
|
||||
shouldstop = shouldstop || removedSelf
|
||||
shouldStop = shouldStop || removedSelf
|
||||
s.w.Trigger(cc.ID, err)
|
||||
default:
|
||||
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
||||
}
|
||||
atomic.StoreUint64(&s.r.index, e.Index)
|
||||
atomic.StoreUint64(&s.r.term, e.Term)
|
||||
applied = e.Index
|
||||
appliedt = e.Term
|
||||
appliedi = e.Index
|
||||
}
|
||||
return applied, shouldstop
|
||||
return appliedt, appliedi, shouldStop
|
||||
}
|
||||
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
|
@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
ents = append(ents, ent)
|
||||
}
|
||||
|
||||
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
|
||||
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
|
||||
if !shouldStop {
|
||||
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
@ -26,12 +25,7 @@ import (
|
||||
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
|
||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||
// as ReadCloser.
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
snapt, err := s.r.raftStorage.Term(snapi)
|
||||
if err != nil {
|
||||
log.Panicf("get term should never fail: %v", err)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
// get a snapshot of v2 store as []byte
|
||||
clone := s.store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
|
@ -449,6 +449,8 @@ type member struct {
|
||||
grpcServer *grpc.Server
|
||||
grpcAddr string
|
||||
grpcBridge *bridge
|
||||
|
||||
keepDataDirTerminate bool
|
||||
}
|
||||
|
||||
func (m *member) GRPCAddr() string { return m.grpcAddr }
|
||||
@ -746,8 +748,10 @@ func (m *member) Restart(t *testing.T) error {
|
||||
func (m *member) Terminate(t *testing.T) {
|
||||
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
|
||||
m.Close()
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
if !m.keepDataDirTerminate {
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRestartRemoved ensures that restarting removed member must exit
|
||||
// if 'initial-cluster-state' is set 'new' and old data directory still exists
|
||||
// (see https://github.com/coreos/etcd/issues/7512 for more).
|
||||
func TestRestartRemoved(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||
|
||||
// 1. start single-member cluster
|
||||
c := NewCluster(t, 1)
|
||||
for _, m := range c.Members {
|
||||
m.ServerConfig.StrictReconfigCheck = true
|
||||
}
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
// 2. add a new member
|
||||
c.AddMember(t)
|
||||
c.WaitLeader(t)
|
||||
|
||||
oldm := c.Members[0]
|
||||
oldm.keepDataDirTerminate = true
|
||||
|
||||
// 3. remove first member, shut down without deleting data
|
||||
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
|
||||
t.Fatalf("expected to remove member, got error %v", err)
|
||||
}
|
||||
c.WaitLeader(t)
|
||||
|
||||
// 4. restart first member with 'initial-cluster-state=new'
|
||||
// wrong config, expects exit within ReqTimeout
|
||||
oldm.ServerConfig.NewCluster = false
|
||||
if err := oldm.Restart(t); err != nil {
|
||||
t.Fatalf("unexpected ForceRestart error: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
oldm.Close()
|
||||
os.RemoveAll(oldm.ServerConfig.DataDir)
|
||||
}()
|
||||
select {
|
||||
case <-oldm.s.StopNotify():
|
||||
case <-time.After(time.Minute):
|
||||
t.Fatalf("removed member didn't exit within %v", time.Minute)
|
||||
}
|
||||
}
|
||||
|
||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||
// a random key first, and check the new key could be got from all client urls
|
||||
// of the cluster.
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@ -58,7 +58,7 @@ func TestEmbedEtcd(t *testing.T) {
|
||||
setupEmbedCfg(&tests[5].cfg, []url.URL{urls[4]}, []url.URL{urls[5], urls[6]})
|
||||
setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]})
|
||||
|
||||
dir := path.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
|
||||
dir := filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
|
||||
os.RemoveAll(dir)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
@ -35,23 +36,85 @@ func TestV3AuthEmptyUserGet(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
api := toGRPC(clus.Client(0))
|
||||
auth := api.Auth
|
||||
|
||||
if _, err := auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.AuthEnable(ctx, &pb.AuthEnableRequest{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
authSetupRoot(t, api.Auth)
|
||||
|
||||
_, err := api.KV.Range(ctx, &pb.RangeRequest{Key: []byte("abc")})
|
||||
if !eqErrGRPC(err, rpctypes.ErrUserEmpty) {
|
||||
t.Fatalf("got %v, expected %v", err, rpctypes.ErrUserEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3AuthTokenWithDisable tests that auth won't crash if
|
||||
// given a valid token when authentication is disabled
|
||||
func TestV3AuthTokenWithDisable(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
authSetupRoot(t, toGRPC(clus.Client(0)).Auth)
|
||||
|
||||
c, cerr := clientv3.New(clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "root", Password: "123"})
|
||||
if cerr != nil {
|
||||
t.Fatal(cerr)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
rctx, cancel := context.WithCancel(context.TODO())
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
defer close(donec)
|
||||
for rctx.Err() == nil {
|
||||
c.Put(rctx, "abc", "def")
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if _, err := c.AuthDisable(context.TODO()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
<-donec
|
||||
}
|
||||
|
||||
func TestV3AuthRevision(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
api := toGRPC(clus.Client(0))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
presp, perr := api.KV.Put(ctx, &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
|
||||
cancel()
|
||||
if perr != nil {
|
||||
t.Fatal(perr)
|
||||
}
|
||||
rev := presp.Header.Revision
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
aresp, aerr := api.Auth.UserAdd(ctx, &pb.AuthUserAddRequest{Name: "root", Password: "123"})
|
||||
cancel()
|
||||
if aerr != nil {
|
||||
t.Fatal(aerr)
|
||||
}
|
||||
if aresp.Header.Revision != rev {
|
||||
t.Fatalf("revision expected %d, got %d", rev, aresp.Header.Revision)
|
||||
}
|
||||
}
|
||||
|
||||
func authSetupRoot(t *testing.T, auth pb.AuthClient) {
|
||||
if _, err := auth.UserAdd(context.TODO(), &pb.AuthUserAddRequest{Name: "root", Password: "123"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.RoleAdd(context.TODO(), &pb.AuthRoleAddRequest{Name: "root"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.UserGrantRole(context.TODO(), &pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := auth.AuthEnable(context.TODO(), &pb.AuthEnableRequest{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
|
||||
// sort keys so deletes are in same order among all members,
|
||||
// otherwise the backened hashes will be different
|
||||
keys := make([]string, 0, len(l.itemSet))
|
||||
for item := range l.itemSet {
|
||||
keys = append(keys, item.Key)
|
||||
}
|
||||
keys := l.Keys()
|
||||
sort.StringSlice(keys).Sort()
|
||||
for _, key := range keys {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
|
||||
@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
|
||||
return ErrLeaseNotFound
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
for _, it := range items {
|
||||
l.itemSet[it] = struct{}{}
|
||||
le.itemMap[it] = id
|
||||
}
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
||||
return ErrLeaseNotFound
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
for _, it := range items {
|
||||
delete(l.itemSet, it)
|
||||
delete(le.itemMap, it)
|
||||
}
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -506,6 +507,8 @@ type Lease struct {
|
||||
// expiry is time when lease should expire; must be 64-bit aligned.
|
||||
expiry monotime.Time
|
||||
|
||||
// mu protects concurrent accesses to itemSet
|
||||
mu sync.RWMutex
|
||||
itemSet map[LeaseItem]struct{}
|
||||
revokec chan struct{}
|
||||
}
|
||||
@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
|
||||
|
||||
// Keys returns all the keys attached to the lease.
|
||||
func (l *Lease) Keys() []string {
|
||||
l.mu.RLock()
|
||||
keys := make([]string, 0, len(l.itemSet))
|
||||
for k := range l.itemSet {
|
||||
keys = append(keys, k.Key)
|
||||
}
|
||||
l.mu.RUnlock()
|
||||
return keys
|
||||
}
|
||||
|
||||
|
@ -15,11 +15,13 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -76,6 +78,53 @@ func TestLessorGrant(t *testing.T) {
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
|
||||
// from concurrent map writes on 'itemSet'.
|
||||
func TestLeaseConcurrentKeys(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
fd := &fakeDeleter{}
|
||||
|
||||
le := newLessor(be, minLeaseTTL)
|
||||
le.SetRangeDeleter(fd)
|
||||
|
||||
// grant a lease with long term (100 seconds) to
|
||||
// avoid early termination during the test.
|
||||
l, err := le.Grant(1, 100)
|
||||
if err != nil {
|
||||
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
|
||||
}
|
||||
|
||||
itemn := 10
|
||||
items := make([]LeaseItem, itemn)
|
||||
for i := 0; i < itemn; i++ {
|
||||
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
|
||||
}
|
||||
if err = le.Attach(l.ID, items); err != nil {
|
||||
t.Fatalf("failed to attach items to the lease: %v", err)
|
||||
}
|
||||
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
le.Detach(l.ID, items)
|
||||
close(donec)
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(itemn)
|
||||
for i := 0; i < itemn; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
l.Keys()
|
||||
}()
|
||||
}
|
||||
|
||||
<-donec
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestLessorRevoke ensures Lessor can revoke a lease.
|
||||
// The items in the revoked lease should be removed from
|
||||
// the backend.
|
||||
@ -351,5 +400,5 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||
t.Fatalf("failed to create tmpdir (%v)", err)
|
||||
}
|
||||
|
||||
return tmpPath, backend.New(path.Join(tmpPath, "be"), time.Second, 10000)
|
||||
return tmpPath, backend.New(filepath.Join(tmpPath, "be"), time.Second, 10000)
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
}
|
||||
|
||||
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
if berr != nil {
|
||||
return berr
|
||||
}
|
||||
@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
return err
|
||||
}
|
||||
tmpb = tmptx.Bucket(next)
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
|
||||
count = 0
|
||||
}
|
||||
return tmpb.Put(k, v)
|
||||
@ -334,7 +337,7 @@ func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, strin
|
||||
if err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
tmpPath := path.Join(dir, "database")
|
||||
tmpPath := filepath.Join(dir, "database")
|
||||
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
@ -39,7 +39,7 @@ var (
|
||||
// IsDirWriteable checks if dir is writable by writing and removing a file
|
||||
// to dir. It returns nil if dir is writable.
|
||||
func IsDirWriteable(dir string) error {
|
||||
f := path.Join(dir, ".touch")
|
||||
f := filepath.Join(dir, ".touch")
|
||||
if err := ioutil.WriteFile(f, []byte(""), PrivateFileMode); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ package fileutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@ -45,7 +45,7 @@ func purgeFile(dirname string, suffix string, max uint, interval time.Duration,
|
||||
sort.Strings(newfnames)
|
||||
fnames = newfnames
|
||||
for len(newfnames) > int(max) {
|
||||
f := path.Join(dirname, newfnames[0])
|
||||
f := filepath.Join(dirname, newfnames[0])
|
||||
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
|
||||
if err != nil {
|
||||
break
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@ -33,7 +33,7 @@ func TestPurgeFile(t *testing.T) {
|
||||
|
||||
// minimal file set
|
||||
for i := 0; i < 3; i++ {
|
||||
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||
if ferr != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -53,7 +53,7 @@ func TestPurgeFile(t *testing.T) {
|
||||
// rest of the files
|
||||
for i := 4; i < 10; i++ {
|
||||
go func(n int) {
|
||||
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", n)))
|
||||
f, ferr := os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", n)))
|
||||
if ferr != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -99,7 +99,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var f *os.File
|
||||
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||
f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -107,7 +107,7 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
||||
}
|
||||
|
||||
// create a purge barrier at 5
|
||||
p := path.Join(dir, fmt.Sprintf("%d.test", 5))
|
||||
p := filepath.Join(dir, fmt.Sprintf("%d.test", 5))
|
||||
l, err := LockFile(p, os.O_WRONLY, PrivateFileMode)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -27,8 +27,7 @@ import (
|
||||
"math/big"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
@ -91,8 +90,8 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
certPath := path.Join(dirpath, "cert.pem")
|
||||
keyPath := path.Join(dirpath, "key.pem")
|
||||
certPath := filepath.Join(dirpath, "cert.pem")
|
||||
keyPath := filepath.Join(dirpath, "key.pem")
|
||||
_, errcert := os.Stat(certPath)
|
||||
_, errkey := os.Stat(keyPath)
|
||||
if errcert == nil && errkey == nil {
|
||||
@ -120,10 +119,11 @@ func SelfCert(dirpath string, hosts []string) (info TLSInfo, err error) {
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
if ip := net.ParseIP(host); ip != nil {
|
||||
h, _, _ := net.SplitHostPort(host)
|
||||
if ip := net.ParseIP(h); ip != nil {
|
||||
tmpl.IPAddresses = append(tmpl.IPAddresses, ip)
|
||||
} else {
|
||||
tmpl.DNSNames = append(tmpl.DNSNames, strings.Split(host, ":")[0])
|
||||
tmpl.DNSNames = append(tmpl.DNSNames, h)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRaftFreesReadOnlyMem ensures raft will free read request from
|
||||
// readOnly readIndexQueue and pendingReadIndex map.
|
||||
// related issue: https://github.com/coreos/etcd/issues/7571
|
||||
func TestRaftFreesReadOnlyMem(t *testing.T) {
|
||||
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
sm.raftLog.commitTo(sm.raftLog.lastIndex())
|
||||
|
||||
ctx := []byte("ctx")
|
||||
|
||||
// leader starts linearizable read request.
|
||||
// more info: raft dissertation 6.4, step 2.
|
||||
sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
|
||||
msgs := sm.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
||||
}
|
||||
if msgs[0].Type != pb.MsgHeartbeat {
|
||||
t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
|
||||
}
|
||||
if !bytes.Equal(msgs[0].Context, ctx) {
|
||||
t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
|
||||
}
|
||||
if len(sm.readOnly.readIndexQueue) != 1 {
|
||||
t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
|
||||
}
|
||||
if len(sm.readOnly.pendingReadIndex) != 1 {
|
||||
t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
|
||||
}
|
||||
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
|
||||
t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
|
||||
}
|
||||
|
||||
// heartbeat responses from majority of followers (1 in this case)
|
||||
// acknowledge the authority of the leader.
|
||||
// more info: raft dissertation 6.4, step 3.
|
||||
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
|
||||
if len(sm.readOnly.readIndexQueue) != 0 {
|
||||
t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
|
||||
}
|
||||
if len(sm.readOnly.pendingReadIndex) != 0 {
|
||||
t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
|
||||
}
|
||||
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
|
||||
t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
|
||||
// MsgAppResp.
|
||||
func TestMsgAppRespWaitReset(t *testing.T) {
|
||||
|
@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
|
||||
if found {
|
||||
ro.readIndexQueue = ro.readIndexQueue[i:]
|
||||
for _, rs := range rss {
|
||||
delete(ro.pendingReadIndex, string(rs.req.Context))
|
||||
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
|
||||
}
|
||||
return rss
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
@ -41,7 +41,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
os.Remove(f.Name())
|
||||
return n, err
|
||||
}
|
||||
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
|
||||
fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
|
||||
if fileutil.Exist(fn) {
|
||||
os.Remove(f.Name())
|
||||
return n, nil
|
||||
@ -67,7 +67,7 @@ func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
|
||||
wfn := fmt.Sprintf("%016x.snap.db", id)
|
||||
for _, fn := range fns {
|
||||
if fn == wfn {
|
||||
return path.Join(s.dir, fn), nil
|
||||
return filepath.Join(s.dir, fn), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("snap: snapshot file doesn't exist")
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"hash/crc32"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@ -84,13 +84,13 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
}
|
||||
|
||||
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
|
||||
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
|
||||
if err == nil {
|
||||
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
} else {
|
||||
err1 := os.Remove(path.Join(s.dir, fname))
|
||||
err1 := os.Remove(filepath.Join(s.dir, fname))
|
||||
if err1 != nil {
|
||||
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
|
||||
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
|
||||
}
|
||||
}
|
||||
return err
|
||||
@ -114,7 +114,7 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
}
|
||||
|
||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
fpath := path.Join(dir, name)
|
||||
fpath := filepath.Join(dir, name)
|
||||
snap, err := Read(fpath)
|
||||
if err != nil {
|
||||
renameBroken(fpath)
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"hash/crc32"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@ -38,7 +38,7 @@ var testSnap = &raftpb.Snapshot{
|
||||
}
|
||||
|
||||
func TestSaveAndLoad(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -60,7 +60,7 @@ func TestSaveAndLoad(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBadCRC(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -76,14 +76,14 @@ func TestBadCRC(t *testing.T) {
|
||||
// fake a crc mismatch
|
||||
crcTable = crc32.MakeTable(crc32.Koopman)
|
||||
|
||||
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
|
||||
_, err = Read(filepath.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
|
||||
if err == nil || err != ErrCRCMismatch {
|
||||
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailback(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -91,7 +91,7 @@ func TestFailback(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
large := fmt.Sprintf("%016x-%016x-%016x.snap", 0xFFFF, 0xFFFF, 0xFFFF)
|
||||
err = ioutil.WriteFile(path.Join(dir, large), []byte("bad data"), 0666)
|
||||
err = ioutil.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0666)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -109,7 +109,7 @@ func TestFailback(t *testing.T) {
|
||||
if !reflect.DeepEqual(g, testSnap) {
|
||||
t.Errorf("snap = %#v, want %#v", g, testSnap)
|
||||
}
|
||||
if f, err := os.Open(path.Join(dir, large) + ".broken"); err != nil {
|
||||
if f, err := os.Open(filepath.Join(dir, large) + ".broken"); err != nil {
|
||||
t.Fatal("broken snapshot does not exist")
|
||||
} else {
|
||||
f.Close()
|
||||
@ -117,7 +117,7 @@ func TestFailback(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapNames(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -125,7 +125,7 @@ func TestSnapNames(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
for i := 1; i <= 5; i++ {
|
||||
var f *os.File
|
||||
if f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
|
||||
if f, err = os.Create(filepath.Join(dir, fmt.Sprintf("%d.snap", i))); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
f.Close()
|
||||
@ -146,7 +146,7 @@ func TestSnapNames(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLoadNewestSnap(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -175,7 +175,7 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNoSnapshot(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -189,19 +189,19 @@ func TestNoSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEmptySnapshot(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
|
||||
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte(""), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = Read(path.Join(dir, "1.snap"))
|
||||
_, err = Read(filepath.Join(dir, "1.snap"))
|
||||
if err != ErrEmptySnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||
}
|
||||
@ -210,14 +210,14 @@ func TestEmptySnapshot(t *testing.T) {
|
||||
// TestAllSnapshotBroken ensures snapshotter returns
|
||||
// ErrNoSnapshot if all the snapshots are broken.
|
||||
func TestAllSnapshotBroken(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
|
||||
err = ioutil.WriteFile(filepath.Join(dir, "1.snap"), []byte("bad"), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
48
test
48
test
@ -9,7 +9,7 @@
|
||||
# PKG=./wal ./test
|
||||
# PKG=snap ./test
|
||||
#
|
||||
# Run code coverage
|
||||
# Run code coverage
|
||||
# COVERDIR=coverage PASSES=cov ./test
|
||||
set -e
|
||||
|
||||
@ -32,10 +32,6 @@ TEST_PKGS=`find . -name \*_test.go | while read a; do dirname $a; done | sort |
|
||||
FORMATTABLE=`find . -name \*.go | while read a; do echo $(dirname $a)/"*.go"; done | sort | uniq | egrep -v "$IGNORE_PKGS" | sed "s|\./||g"`
|
||||
TESTABLE_AND_FORMATTABLE=`echo "$TEST_PKGS" | egrep -v "$INTEGRATION_PKGS"`
|
||||
|
||||
# TODO: 'client' pkg fails with gosimple from generated files
|
||||
# TODO: 'rafttest' is failing with unused
|
||||
GOSIMPLE_UNUSED_PATHS=`find . -name \*.go | while read a; do dirname $a; done | sort | uniq | egrep -v "$IGNORE_PKGS" | grep -v 'client'`
|
||||
|
||||
if [ -z "$GOARCH" ]; then
|
||||
GOARCH=$(go env GOARCH);
|
||||
fi
|
||||
@ -194,48 +190,6 @@ function fmt_pass {
|
||||
fi
|
||||
done
|
||||
|
||||
if which goword >/dev/null; then
|
||||
echo "Checking goword..."
|
||||
# get all go files to process
|
||||
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
|
||||
# ignore tests and protobuf files
|
||||
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
|
||||
# only check for broken exported godocs
|
||||
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
|
||||
if [ ! -z "$gowordRes" ]; then
|
||||
echo -e "goword checking failed:\n${gowordRes}"
|
||||
exit 255
|
||||
fi
|
||||
else
|
||||
echo "Skipping goword..."
|
||||
fi
|
||||
|
||||
if which gosimple >/dev/null; then
|
||||
echo "Checking gosimple..."
|
||||
for path in $GOSIMPLE_UNUSED_PATHS; do
|
||||
simplResult=`gosimple ${path} 2>&1 || true`
|
||||
if [ -n "${simplResult}" ]; then
|
||||
echo -e "gosimple checking ${path} failed:\n${simplResult}"
|
||||
exit 255
|
||||
fi
|
||||
done
|
||||
else
|
||||
echo "Skipping gosimple..."
|
||||
fi
|
||||
|
||||
if which unused >/dev/null; then
|
||||
echo "Checking unused..."
|
||||
for path in $GOSIMPLE_UNUSED_PATHS; do
|
||||
unusedResult=`unused ${path} 2>&1 || true`
|
||||
if [ -n "${unusedResult}" ]; then
|
||||
echo -e "unused checking ${path} failed:\n${unusedResult}"
|
||||
exit 255
|
||||
fi
|
||||
done
|
||||
else
|
||||
echo "Skipping unused..."
|
||||
fi
|
||||
|
||||
echo "Checking for license header..."
|
||||
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './cmd/*' ! -path './gopath.proto/*'); do
|
||||
head -n3 "${file}" | grep -Eq "(Copyright|generated|GENERATED)" || echo -e " ${file}"
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -58,7 +58,7 @@ func main() {
|
||||
ss := snap.New(snapDir(*from))
|
||||
snapshot, err = ss.Load()
|
||||
} else {
|
||||
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
|
||||
snapshot, err = snap.Read(filepath.Join(snapDir(*from), *snapfile))
|
||||
}
|
||||
|
||||
switch err {
|
||||
@ -132,9 +132,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
|
||||
func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
|
||||
|
||||
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
|
||||
func snapDir(dataDir string) string { return filepath.Join(dataDir, "member", "snap") }
|
||||
|
||||
func parseWALMetadata(b []byte) (id, cid types.ID) {
|
||||
var metadata etcdserverpb.Metadata
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.1.2"
|
||||
Version = "3.1.6"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
@ -17,7 +17,7 @@ package wal
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
@ -65,7 +65,7 @@ func (fp *filePipeline) Close() error {
|
||||
|
||||
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
|
||||
// count % 2 so this file isn't the same as the one last published
|
||||
fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
|
||||
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
|
||||
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ package wal
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
@ -94,6 +94,6 @@ func openLast(dirpath string) (*fileutil.LockedFile, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
last := path.Join(dirpath, names[len(names)-1])
|
||||
last := filepath.Join(dirpath, names[len(names)-1])
|
||||
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
|
||||
}
|
||||
|
18
wal/wal.go
18
wal/wal.go
@ -21,7 +21,7 @@ import (
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -97,7 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
}
|
||||
|
||||
// keep temporary wal directory so WAL initialization appears atomic
|
||||
tmpdirpath := path.Clean(dirpath) + ".tmp"
|
||||
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
|
||||
if fileutil.Exist(tmpdirpath) {
|
||||
if err := os.RemoveAll(tmpdirpath); err != nil {
|
||||
return nil, err
|
||||
@ -107,7 +107,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := path.Join(tmpdirpath, walName(0, 0))
|
||||
p := filepath.Join(tmpdirpath, walName(0, 0))
|
||||
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -143,7 +143,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
}
|
||||
|
||||
// directory was renamed; sync parent dir to persist rename
|
||||
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
||||
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
|
||||
if perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
@ -196,7 +196,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
||||
rs := make([]io.Reader, 0)
|
||||
ls := make([]*fileutil.LockedFile, 0)
|
||||
for _, name := range names[nameIndex:] {
|
||||
p := path.Join(dirpath, name)
|
||||
p := filepath.Join(dirpath, name)
|
||||
if write {
|
||||
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
|
||||
if err != nil {
|
||||
@ -232,7 +232,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
||||
// write reuses the file descriptors from read; don't close so
|
||||
// WAL can append without dropping the file lock
|
||||
w.readClose = nil
|
||||
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
|
||||
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
|
||||
closer()
|
||||
return nil, err
|
||||
}
|
||||
@ -372,7 +372,7 @@ func (w *WAL) cut() error {
|
||||
return err
|
||||
}
|
||||
|
||||
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
||||
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
||||
|
||||
// create a temp wal file with name sequence + 1, or truncate the existing one
|
||||
newTail, err := w.fp.Open()
|
||||
@ -464,7 +464,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
|
||||
found := false
|
||||
|
||||
for i, l := range w.locks {
|
||||
_, lockIndex, err := parseWalName(path.Base(l.Name()))
|
||||
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -611,7 +611,7 @@ func (w *WAL) seq() uint64 {
|
||||
if t == nil {
|
||||
return 0
|
||||
}
|
||||
seq, _, err := parseWalName(path.Base(t.Name()))
|
||||
seq, _, err := parseWalName(filepath.Base(t.Name()))
|
||||
if err != nil {
|
||||
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@ -40,7 +40,7 @@ func TestNew(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
|
||||
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
|
||||
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
||||
}
|
||||
defer w.Close()
|
||||
@ -51,7 +51,7 @@ func TestNew(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
gd := make([]byte, off)
|
||||
f, err := os.Open(path.Join(p, path.Base(w.tail().Name())))
|
||||
f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -90,7 +90,7 @@ func TestNewForInitedDir(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
os.Create(path.Join(p, walName(0, 0)))
|
||||
os.Create(filepath.Join(p, walName(0, 0)))
|
||||
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
|
||||
t.Errorf("err = %v, want %v", err, os.ErrExist)
|
||||
}
|
||||
@ -103,7 +103,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
f, err := os.Create(path.Join(dir, walName(0, 0)))
|
||||
f, err := os.Create(filepath.Join(dir, walName(0, 0)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -113,7 +113,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
|
||||
if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
|
||||
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
||||
}
|
||||
if w.seq() != 0 {
|
||||
@ -122,7 +122,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
w.Close()
|
||||
|
||||
wname := walName(2, 10)
|
||||
f, err = os.Create(path.Join(dir, wname))
|
||||
f, err = os.Create(filepath.Join(dir, wname))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -132,7 +132,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
if g := path.Base(w.tail().Name()); g != wname {
|
||||
if g := filepath.Base(w.tail().Name()); g != wname {
|
||||
t.Errorf("name = %+v, want %+v", g, wname)
|
||||
}
|
||||
if w.seq() != 2 {
|
||||
@ -172,7 +172,7 @@ func TestCut(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wname := walName(1, 1)
|
||||
if g := path.Base(w.tail().Name()); g != wname {
|
||||
if g := filepath.Base(w.tail().Name()); g != wname {
|
||||
t.Errorf("name = %s, want %s", g, wname)
|
||||
}
|
||||
|
||||
@ -188,14 +188,14 @@ func TestCut(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wname = walName(2, 2)
|
||||
if g := path.Base(w.tail().Name()); g != wname {
|
||||
if g := filepath.Base(w.tail().Name()); g != wname {
|
||||
t.Errorf("name = %s, want %s", g, wname)
|
||||
}
|
||||
|
||||
// check the state in the last WAL
|
||||
// We do check before closing the WAL to ensure that Cut syncs the data
|
||||
// into the disk.
|
||||
f, err := os.Open(path.Join(p, wname))
|
||||
f, err := os.Open(filepath.Join(p, wname))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -254,7 +254,7 @@ func TestSaveWithCut(t *testing.T) {
|
||||
}
|
||||
defer neww.Close()
|
||||
wname := walName(1, index)
|
||||
if g := path.Base(neww.tail().Name()); g != wname {
|
||||
if g := filepath.Base(neww.tail().Name()); g != wname {
|
||||
t.Errorf("name = %s, want %s", g, wname)
|
||||
}
|
||||
|
||||
@ -416,7 +416,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
||||
}
|
||||
md.Close()
|
||||
|
||||
if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
|
||||
if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -570,7 +570,7 @@ func TestReleaseLockTo(t *testing.T) {
|
||||
}
|
||||
for i, l := range w.locks {
|
||||
var lockIndex uint64
|
||||
_, lockIndex, err = parseWalName(path.Base(l.Name()))
|
||||
_, lockIndex, err = parseWalName(filepath.Base(l.Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -588,7 +588,7 @@ func TestReleaseLockTo(t *testing.T) {
|
||||
if len(w.locks) != 1 {
|
||||
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
|
||||
}
|
||||
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
|
||||
_, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -673,11 +673,11 @@ func TestRestartCreateWal(t *testing.T) {
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
// make temporary directory so it looks like initialization is interrupted
|
||||
tmpdir := path.Clean(p) + ".tmp"
|
||||
tmpdir := filepath.Clean(p) + ".tmp"
|
||||
if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = os.OpenFile(path.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
|
||||
if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -729,7 +729,7 @@ func TestOpenOnTornWrite(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
fn := path.Join(p, path.Base(w.tail().Name()))
|
||||
fn := filepath.Join(p, filepath.Base(w.tail().Name()))
|
||||
w.Close()
|
||||
|
||||
// clobber some entry with 0's to simulate a torn write
|
||||
|
Reference in New Issue
Block a user