Compare commits

...

25 Commits

Author SHA1 Message Date
546c0f7ed6 version: bump to v3.0.10 2016-09-23 10:49:03 -07:00
adbad1c9b5 ctlv3: close snapshot file before rename (Windows) 2016-09-23 09:11:02 -07:00
273b986751 clientv3: process closed watcherStreams in watcherGrpcStream run loop
Was racing with Watch() when closing the grpc stream on no watchers.

Fixes #6476
2016-09-21 15:52:20 -07:00
5b205729b9 rafthttp: add v3.0.0 to supported streams 2016-09-16 21:54:55 +09:00
fe900b09dd version: bump to v3.0.9+git 2016-09-15 15:10:23 -07:00
494c012659 version: bump to v3.0.9 2016-09-15 12:56:33 -07:00
4abc381ebe clientv3: drain buffered WatchResponses before resuming
Otherwise, the watcherStream can receive WatchResponses in the
middle of a resume, corrupting the stream.

Fixes #6364
2016-09-15 12:38:15 -07:00
73c8fdac53 integration: fix compilation for backported Election test 2016-09-15 11:45:37 -07:00
ee2717493a ctlv3: fix line parsing for Windows 2016-09-15 11:25:53 -07:00
2435eb9ecd clientv3: balancer panics when call up after close
Fix the issue by adding a simple guard varable.
2016-09-15 18:46:26 +09:00
8fb533dabe embed: warn on domain name in listener 2016-09-15 18:46:19 +09:00
2f0f5ac504 Revert "Merge pull request #6365 from heyitsanthony/fix-dns-bind"
This reverts commit af5ab7b351, reversing
changes made to da6a0f0594.
2016-09-15 18:43:46 +09:00
9ab811d478 auth: fix range handling bugs.
Test 15, counting from zero, in TestGetMergedPerms
in etcd/auth/range_perm_cache_test.go, was trying
incorrectly assert that [a, b) merged with [b, "")
should be [a, b). Added a test specifically for
this. This patch fixes the incorrect larger test
and the bugs in the code that it was hiding.

Fixes #6359
2016-09-15 18:41:56 +09:00
e0a99fb4ba version: bump to v3.0.8+git 2016-09-09 15:56:31 -07:00
d40982fc91 version: bump to v3.0.8 2016-09-09 13:14:44 -07:00
fe3a1cc31b wal: fix error type 2016-09-09 09:11:25 +09:00
70713706a1 wal: fix err shadowing (go vet) 2016-09-09 09:07:48 +09:00
0054e7e89b etcdctl: restore should create a snapshot
Restore should create a snasphot. So the new db file
can be sent to newly joined member.
2016-09-09 09:03:51 +09:00
97f718b504 fileutil: windows OpenDir
Windows needs to open a directory with write access to fsync but the go
runtime won't open directories that way.
2016-09-09 09:01:56 +09:00
202da9270e wal: fsync directory after wal file rename
Fixes #6368
2016-09-09 09:01:49 +09:00
6e83ec0ed7 etcdmain: reject binding listeners to domain names
Fixes #6336
2016-09-07 08:08:35 +09:00
5c44cdfdaa etcdctl/ctlv3: don't crash when we should prompt for pw.
when 'etcdctl --user name get blah' is invoked to
 prompt for password, don't panic.

 addresses the segfault part of #6343
2016-09-04 09:02:50 +09:00
09a239f040 e2e: add quoted key/value to txn test 2016-09-04 09:02:47 +09:00
3faff8b2e2 etcdctl: fix quoted string handling in txn and watch
Fixes #6315
2016-09-04 09:02:28 +09:00
2345fda18e version: bump to v3.0.7+git 2016-08-31 16:41:06 -07:00
18 changed files with 363 additions and 115 deletions

View File

@ -22,7 +22,10 @@ import (
"github.com/coreos/etcd/mvcc/backend"
)
// isSubset returns true if a is a subset of b
// isSubset returns true if a is a subset of b.
// If a is a prefix of b, then a is a subset of b.
// Given intervals [a1,a2) and [b1,b2), is
// the a interval a subset of b?
func isSubset(a, b *rangePerm) bool {
switch {
case len(a.end) == 0 && len(b.end) == 0:
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
// b is a key, a is a range
return false
case len(a.end) == 0:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
// a is a key, b is a range. need b1 <= a1 and a1 < b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
default:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
// both are ranges. need b1 <= a1 and a2 <= b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
}
}
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
i := 0
for i < len(perms) {
begin, next := i, i
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
next++
}
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
if next != begin && len(perms[next].end) > 0 {
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
} else {
merged = append(merged, perms[begin])
if next != begin {
merged = append(merged, perms[next])
}
}
i = next + 1
}

View File

@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
[]*rangePerm{{[]byte("a"), []byte("b")}},
[]*rangePerm{{[]byte("a"), []byte("b")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
[]*rangePerm{{[]byte("a"), []byte("c")}},
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
},
{
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
},
// duplicate ranges
{

View File

@ -45,6 +45,8 @@ type simpleBalancer struct {
// pinAddr is the currently pinned address; set to the empty string on
// intialization and shutdown.
pinAddr string
closed bool
}
func newSimpleBalancer(eps []string) *simpleBalancer {
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
// gRPC might call Up after it called Close. We add this check
// to "fix" it up at application layer. Or our simplerBalancer
// might panic since b.upc is closed.
if b.closed {
return func(err error) {}
}
if len(b.upEps) == 0 {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.pinAddr = addr.Addr
}
b.upEps[addr.Addr] = struct{}{}
b.mu.Unlock()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
delete(b.upEps, addr.Addr)
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
// In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice.
if b.closed {
return nil
}
b.closed = true
close(b.notifyCh)
// terminate all waiting Get()s
b.pinAddr = ""
if len(b.upEps) == 0 {
close(b.upc)
}
b.mu.Unlock()
return nil
}

View File

@ -127,6 +127,8 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
closeErr error
@ -189,11 +191,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
streams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
}
go wgs.run()
return wgs
@ -242,7 +245,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@ -352,15 +354,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}
// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
}
// run is the root of the goroutines for managing a watcher client
@ -464,6 +470,10 @@ func (w *watchGrpcStream) run() {
cancelSet = make(map[int64]struct{})
case <-stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
return
}
}
// send failed; queue for retry
@ -522,6 +532,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
}
}()
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
@ -602,20 +621,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
@ -669,6 +677,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RUnlock()
for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()
// pause serveStream
ws.resumec <- -1
@ -701,6 +713,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
return nil
}
// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{

View File

@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
}
rqs := txnRequests{
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
ifSucess: []string{"get key1", "get key2"},
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
rqs := []txnRequests{
{
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
},
{
compare: []string{`version("key \"with\" space") = "1"`},
ifSucess: []string{`get "key \"with\" space"`},
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
},
}
if err := ctlV3Txn(cx, rqs); err != nil {
cx.t.Fatal(err)
for _, rq := range rqs {
if err := ctlV3Txn(cx, rq); err != nil {
cx.t.Fatal(err)
}
}
}

View File

@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
var cfg authCfg
splitted := strings.SplitN(userFlag, ":", 2)
if len(splitted) == 0 {
if len(splitted) < 2 {
cfg.username = userFlag
cfg.password, err = speakeasy.Ask("Password: ")
if err != nil {

View File

@ -36,7 +36,10 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/spf13/cobra"
"golang.org/x/net/context"
)
@ -112,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
partpath := path + ".part"
f, err := os.Create(partpath)
defer f.Close()
if err != nil {
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
ExitWithError(ExitBadArgs, exiterr)
@ -131,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
fileutil.Fsync(f)
f.Close()
if rerr := os.Rename(partpath, path); rerr != nil {
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
ExitWithError(ExitIO, exiterr)
@ -186,8 +191,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
}
makeDB(snapdir, args[0])
makeWAL(waldir, cl)
makeDB(snapdir, args[0], len(cl.Members()))
makeWALAndSnap(waldir, snapdir, cl)
}
func initialClusterFromName(name string) string {
@ -199,11 +204,18 @@ func initialClusterFromName(name string) string {
}
// makeWAL creates a WAL for the initial cluster
func makeWAL(waldir string, cl *membership.RaftCluster) {
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
if err := fileutil.CreateDirAll(waldir); err != nil {
ExitWithError(ExitIO, err)
}
// add members again to persist them to the store we create.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
cl.SetStore(st)
for _, m := range cl.Members() {
cl.AddMember(m)
}
m := cl.MemberByName(restoreName)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
metadata, merr := md.Marshal()
@ -227,7 +239,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
@ -245,20 +259,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
ents[i] = e
}
w.Save(raftpb.HardState{
Term: 1,
commit, term := uint64(len(ents)), uint64(1)
if err := w.Save(raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: uint64(len(ents))}, ents)
Commit: commit}, ents); err != nil {
ExitWithError(ExitIO, err)
}
b, berr := st.Save()
if berr != nil {
ExitWithError(ExitError, berr)
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}
snapshotter := snap.New(snapdir)
if err := snapshotter.SaveSnap(raftSnap); err != nil {
panic(err)
}
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
ExitWithError(ExitIO, err)
}
}
// initIndex implements ConsistentIndexGetter so the snapshot won't block
// the new raft instance by waiting for a future raft index.
type initIndex struct{}
type initIndex int
func (*initIndex) ConsistentIndex() uint64 { return 1 }
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
// makeDB copies the database snapshot to the snapshot directory
func makeDB(snapdir, dbfile string) {
func makeDB(snapdir, dbfile string, commit int) {
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
if ferr != nil {
ExitWithError(ExitInvalidInput, ferr)
@ -329,7 +371,7 @@ func makeDB(snapdir, dbfile string) {
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
s := mvcc.NewStore(be, nil, &initIndex{})
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {
@ -339,6 +381,7 @@ func makeDB(snapdir, dbfile string) {
// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)

View File

@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
cmp, err := parseCompare(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
op, err := parseRequestUnion(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)

View File

@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
}
func argify(s string) []string {
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
return r.FindAllString(s, -1)
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
args := r.FindAllString(s, -1)
for i := range args {
if len(args[i]) == 0 {
continue
}
if args[i][0] == '\'' {
// 'single-quoted string'
args[i] = args[i][1 : len(args)-1]
} else if args[i][0] == '"' {
// "double quoted string"
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
ExitWithError(ExitInvalidInput, err)
}
}
}
return args
}
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {

View File

@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
watchInteractiveFunc(cmd, args)
return
}
if len(args) < 1 || len(args) > 2 {
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
}
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
key := args[0]
if len(args) == 2 {
if watchPrefix {
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
}
opts = append(opts, clientv3.WithRange(args[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
c := mustClientFromCmd(cmd)
wc := c.Watch(context.TODO(), key, opts...)
printWatchCh(wc)
err := c.Close()
if err == nil {
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
wc, err := getWatchChan(c, args)
if err != nil {
ExitWithError(ExitBadArgs, err)
}
ExitWithError(ExitBadConnection, err)
printWatchCh(wc)
if err = c.Close(); err != nil {
ExitWithError(ExitBadConnection, err)
}
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
}
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
moreargs := flagset.Args()
if len(moreargs) < 1 || len(moreargs) > 2 {
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
ch, err := getWatchChan(c, flagset.Args())
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
var key string
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
if err != nil {
key = moreargs[0]
}
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if len(moreargs) == 2 {
if watchPrefix {
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
continue
}
opts = append(opts, clientv3.WithRange(moreargs[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
ch := c.Watch(context.TODO(), key, opts...)
go printWatchCh(ch)
}
}
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
if len(args) < 1 || len(args) > 2 {
return nil, fmt.Errorf("bad number of arguments")
}
key := args[0]
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if len(args) == 2 {
if watchPrefix {
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
}
opts = append(opts, clientv3.WithRange(args[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
return c.Watch(context.TODO(), key, opts...), nil
}
func printWatchCh(ch clientv3.WatchChan) {
for resp := range ch {
display.Watch(resp)

View File

@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"runtime"
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
}
func (cfg *config) validateConfig(isSet func(field string) bool) error {
if err := checkBindURLs(cfg.lpurls); err != nil {
return err
}
if err := checkBindURLs(cfg.lcurls); err != nil {
return err
}
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
// TODO(yichengq): check this for joining through discovery service case
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// checkBindURLs returns an error if any URL uses a domain name.
// TODO: return error in 3.2.0
func checkBindURLs(urls []url.URL) error {
for _, url := range urls {
if url.Scheme == "unix" || url.Scheme == "unixs" {
continue
}
host, _, err := net.SplitHostPort(url.Host)
if err != nil {
return err
}
if host == "localhost" {
// special case for local address
// TODO: support /etc/hosts ?
continue
}
if net.ParseIP(host) == nil {
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
plog.Warning(err)
}
}
return nil
}

View File

@ -188,11 +188,8 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
t.Fatal(err)
}
s, serr := concurrency.NewSession(cli)
if serr != nil {
t.Fatal(serr)
}
e := concurrency.NewElection(s, "test")
e := concurrency.NewElection(cli, "test")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := e.Campaign(ctx, "abc")
cancel()

22
pkg/fileutil/dir_unix.go Normal file
View File

@ -0,0 +1,22 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package fileutil
import "os"
// OpenDir opens a directory for syncing.
func OpenDir(path string) (*os.File, error) { return os.Open(path) }

View File

@ -0,0 +1,46 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package fileutil
import (
"os"
"syscall"
)
// OpenDir opens a directory in windows with write access for syncing.
func OpenDir(path string) (*os.File, error) {
fd, err := openDir(path)
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), path), nil
}
func openDir(path string) (fd syscall.Handle, err error) {
if len(path) == 0 {
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
}
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return syscall.InvalidHandle, err
}
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
createmode := uint32(syscall.OPEN_EXISTING)
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
}

View File

@ -49,6 +49,7 @@ var (
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.7"
Version = "3.0.10"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -67,7 +67,11 @@ var (
// A just opened WAL is in read mode, and ready for reading records.
// The WAL will be ready for appending after reading out all the previous records.
type WAL struct {
dir string // the living directory of the underlay files
dir string // the living directory of the underlay files
// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
if err != nil {
return nil, err
}
if _, err := f.Seek(0, os.SEEK_END); err != nil {
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
return nil, err
}
@ -119,17 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
encoder: newEncoder(f, 0),
}
w.locks = append(w.locks, f)
if err := w.saveCrc(0); err != nil {
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}
return w.renameWal(tmpdirpath)
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
if perr != nil {
return nil, perr
}
if perr = fileutil.Fsync(pdir); perr != nil {
return nil, perr
}
if perr = pdir.Close(); err != nil {
return nil, perr
}
return w, nil
}
// Open opens the WAL at the given snap.
@ -139,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
// the given snap. The WAL cannot be appended to before reading out all of its
// previous records.
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
w, err := openAtIndex(dirpath, snap, true)
if err != nil {
return nil, err
}
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
return nil, err
}
return w, nil
}
// OpenForRead only opens the wal files for read.
@ -371,6 +398,10 @@ func (w *WAL) cut() error {
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}
newTail.Close()
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
@ -473,7 +504,7 @@ func (w *WAL) Close() error {
plog.Errorf("failed to unlock during closing wal: %s", err)
}
}
return nil
return w.dirFile.Close()
}
func (w *WAL) saveEntry(e *raftpb.Entry) error {

View File

@ -16,7 +16,11 @@
package wal
import "os"
import (
"os"
"github.com/coreos/etcd/pkg/fileutil"
)
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
// On non-Windows platforms, hold the lock while renaming. Releasing
@ -34,5 +38,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
}
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
return w, nil
df, err := fileutil.OpenDir(w.dir)
w.dirFile = df
return w, err
}