Compare commits
25 Commits
Author | SHA1 | Date | |
---|---|---|---|
546c0f7ed6 | |||
adbad1c9b5 | |||
273b986751 | |||
5b205729b9 | |||
fe900b09dd | |||
494c012659 | |||
4abc381ebe | |||
73c8fdac53 | |||
ee2717493a | |||
2435eb9ecd | |||
8fb533dabe | |||
2f0f5ac504 | |||
9ab811d478 | |||
e0a99fb4ba | |||
d40982fc91 | |||
fe3a1cc31b | |||
70713706a1 | |||
0054e7e89b | |||
97f718b504 | |||
202da9270e | |||
6e83ec0ed7 | |||
5c44cdfdaa | |||
09a239f040 | |||
3faff8b2e2 | |||
2345fda18e |
@ -22,7 +22,10 @@ import (
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
)
|
||||
|
||||
// isSubset returns true if a is a subset of b
|
||||
// isSubset returns true if a is a subset of b.
|
||||
// If a is a prefix of b, then a is a subset of b.
|
||||
// Given intervals [a1,a2) and [b1,b2), is
|
||||
// the a interval a subset of b?
|
||||
func isSubset(a, b *rangePerm) bool {
|
||||
switch {
|
||||
case len(a.end) == 0 && len(b.end) == 0:
|
||||
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
|
||||
// b is a key, a is a range
|
||||
return false
|
||||
case len(a.end) == 0:
|
||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
|
||||
// a is a key, b is a range. need b1 <= a1 and a1 < b2
|
||||
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
|
||||
default:
|
||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
|
||||
// both are ranges. need b1 <= a1 and a2 <= b2
|
||||
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
|
||||
i := 0
|
||||
for i < len(perms) {
|
||||
begin, next := i, i
|
||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
|
||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
|
||||
next++
|
||||
}
|
||||
|
||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||
|
||||
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
|
||||
if next != begin && len(perms[next].end) > 0 {
|
||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||
} else {
|
||||
merged = append(merged, perms[begin])
|
||||
if next != begin {
|
||||
merged = append(merged, perms[next])
|
||||
}
|
||||
}
|
||||
i = next + 1
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
||||
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||
},
|
||||
// duplicate ranges
|
||||
{
|
||||
|
@ -45,6 +45,8 @@ type simpleBalancer struct {
|
||||
// pinAddr is the currently pinned address; set to the empty string on
|
||||
// intialization and shutdown.
|
||||
pinAddr string
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
||||
|
||||
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// gRPC might call Up after it called Close. We add this check
|
||||
// to "fix" it up at application layer. Or our simplerBalancer
|
||||
// might panic since b.upc is closed.
|
||||
if b.closed {
|
||||
return func(err error) {}
|
||||
}
|
||||
|
||||
if len(b.upEps) == 0 {
|
||||
// notify waiting Get()s and pin first connected address
|
||||
close(b.upc)
|
||||
b.pinAddr = addr.Addr
|
||||
}
|
||||
b.upEps[addr.Addr] = struct{}{}
|
||||
b.mu.Unlock()
|
||||
|
||||
// notify client that a connection is up
|
||||
b.readyOnce.Do(func() { close(b.readyc) })
|
||||
|
||||
return func(err error) {
|
||||
b.mu.Lock()
|
||||
delete(b.upEps, addr.Addr)
|
||||
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
||||
|
||||
func (b *simpleBalancer) Close() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
// In case gRPC calls close twice. TODO: remove the checking
|
||||
// when we are sure that gRPC wont call close twice.
|
||||
if b.closed {
|
||||
return nil
|
||||
}
|
||||
b.closed = true
|
||||
close(b.notifyCh)
|
||||
// terminate all waiting Get()s
|
||||
b.pinAddr = ""
|
||||
if len(b.upEps) == 0 {
|
||||
close(b.upc)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -127,6 +127,8 @@ type watchGrpcStream struct {
|
||||
donec chan struct{}
|
||||
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
||||
errc chan error
|
||||
// closingc gets the watcherStream of closing watchers
|
||||
closingc chan *watcherStream
|
||||
|
||||
// the error that closed the watch stream
|
||||
closeErr error
|
||||
@ -189,11 +191,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
||||
cancel: cancel,
|
||||
streams: make(map[int64]*watcherStream),
|
||||
|
||||
respc: make(chan *pb.WatchResponse),
|
||||
reqc: make(chan *watchRequest),
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
respc: make(chan *pb.WatchResponse),
|
||||
reqc: make(chan *watchRequest),
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
closingc: make(chan *watcherStream),
|
||||
}
|
||||
go wgs.run()
|
||||
return wgs
|
||||
@ -242,7 +245,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
||||
case reqc <- wr:
|
||||
ok = true
|
||||
case <-wr.ctx.Done():
|
||||
wgs.stopIfEmpty()
|
||||
case <-donec:
|
||||
if wgs.closeErr != nil {
|
||||
closeCh <- WatchResponse{closeErr: wgs.closeErr}
|
||||
@ -352,15 +354,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
|
||||
go w.serveStream(ws)
|
||||
}
|
||||
|
||||
// closeStream closes the watcher resources and removes it
|
||||
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
|
||||
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
|
||||
w.mu.Lock()
|
||||
// cancels request stream; subscriber receives nil channel
|
||||
close(ws.initReq.retc)
|
||||
// close subscriber's channel
|
||||
close(ws.outc)
|
||||
delete(w.streams, ws.id)
|
||||
empty := len(w.streams) == 0
|
||||
if empty && w.stopc != nil {
|
||||
w.stopc = nil
|
||||
}
|
||||
w.mu.Unlock()
|
||||
return empty
|
||||
}
|
||||
|
||||
// run is the root of the goroutines for managing a watcher client
|
||||
@ -464,6 +470,10 @@ func (w *watchGrpcStream) run() {
|
||||
cancelSet = make(map[int64]struct{})
|
||||
case <-stopc:
|
||||
return
|
||||
case ws := <-w.closingc:
|
||||
if w.closeStream(ws) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// send failed; queue for retry
|
||||
@ -522,6 +532,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||
|
||||
// serveStream forwards watch responses from run() to the subscriber
|
||||
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
||||
defer func() {
|
||||
// signal that this watcherStream is finished
|
||||
select {
|
||||
case w.closingc <- ws:
|
||||
case <-w.donec:
|
||||
w.closeStream(ws)
|
||||
}
|
||||
}()
|
||||
|
||||
var closeErr error
|
||||
emptyWr := &WatchResponse{}
|
||||
wrs := []*WatchResponse{}
|
||||
@ -602,20 +621,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
||||
}
|
||||
}
|
||||
|
||||
w.closeStream(ws)
|
||||
w.stopIfEmpty()
|
||||
// lazily send cancel message if events on missing id
|
||||
}
|
||||
|
||||
func (wgs *watchGrpcStream) stopIfEmpty() {
|
||||
wgs.mu.Lock()
|
||||
if len(wgs.streams) == 0 && wgs.stopc != nil {
|
||||
close(wgs.stopc)
|
||||
wgs.stopc = nil
|
||||
}
|
||||
wgs.mu.Unlock()
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
ws, rerr := w.resume()
|
||||
if rerr != nil {
|
||||
@ -669,6 +677,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
||||
w.mu.RUnlock()
|
||||
|
||||
for _, ws := range streams {
|
||||
// drain recvc so no old WatchResponses (e.g., Created messages)
|
||||
// are processed while resuming
|
||||
ws.drain()
|
||||
|
||||
// pause serveStream
|
||||
ws.resumec <- -1
|
||||
|
||||
@ -701,6 +713,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// drain removes all buffered WatchResponses from the stream's receive channel.
|
||||
func (ws *watcherStream) drain() {
|
||||
for {
|
||||
select {
|
||||
case <-ws.recvc:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||
req := &pb.WatchCreateRequest{
|
||||
|
@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
|
||||
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
||||
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
||||
}
|
||||
|
||||
rqs := txnRequests{
|
||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||
ifSucess: []string{"get key1", "get key2"},
|
||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||
rqs := []txnRequests{
|
||||
{
|
||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
|
||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||
},
|
||||
{
|
||||
compare: []string{`version("key \"with\" space") = "1"`},
|
||||
ifSucess: []string{`get "key \"with\" space"`},
|
||||
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
|
||||
},
|
||||
}
|
||||
if err := ctlV3Txn(cx, rqs); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
for _, rq := range rqs {
|
||||
if err := ctlV3Txn(cx, rq); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
|
||||
var cfg authCfg
|
||||
|
||||
splitted := strings.SplitN(userFlag, ":", 2)
|
||||
if len(splitted) == 0 {
|
||||
if len(splitted) < 2 {
|
||||
cfg.username = userFlag
|
||||
cfg.password, err = speakeasy.Ask("Password: ")
|
||||
if err != nil {
|
||||
|
@ -36,7 +36,10 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -112,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
partpath := path + ".part"
|
||||
f, err := os.Create(partpath)
|
||||
defer f.Close()
|
||||
|
||||
if err != nil {
|
||||
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
|
||||
ExitWithError(ExitBadArgs, exiterr)
|
||||
@ -131,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
fileutil.Fsync(f)
|
||||
|
||||
f.Close()
|
||||
|
||||
if rerr := os.Rename(partpath, path); rerr != nil {
|
||||
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
|
||||
ExitWithError(ExitIO, exiterr)
|
||||
@ -186,8 +191,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
||||
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
||||
}
|
||||
|
||||
makeDB(snapdir, args[0])
|
||||
makeWAL(waldir, cl)
|
||||
makeDB(snapdir, args[0], len(cl.Members()))
|
||||
makeWALAndSnap(waldir, snapdir, cl)
|
||||
}
|
||||
|
||||
func initialClusterFromName(name string) string {
|
||||
@ -199,11 +204,18 @@ func initialClusterFromName(name string) string {
|
||||
}
|
||||
|
||||
// makeWAL creates a WAL for the initial cluster
|
||||
func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
|
||||
if err := fileutil.CreateDirAll(waldir); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
|
||||
// add members again to persist them to the store we create.
|
||||
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||
cl.SetStore(st)
|
||||
for _, m := range cl.Members() {
|
||||
cl.AddMember(m)
|
||||
}
|
||||
|
||||
m := cl.MemberByName(restoreName)
|
||||
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
||||
metadata, merr := md.Marshal()
|
||||
@ -227,7 +239,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
}
|
||||
|
||||
ents := make([]raftpb.Entry, len(peers))
|
||||
nodeIDs := make([]uint64, len(peers))
|
||||
for i, p := range peers {
|
||||
nodeIDs[i] = p.ID
|
||||
cc := raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: p.ID,
|
||||
@ -245,20 +259,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
ents[i] = e
|
||||
}
|
||||
|
||||
w.Save(raftpb.HardState{
|
||||
Term: 1,
|
||||
commit, term := uint64(len(ents)), uint64(1)
|
||||
|
||||
if err := w.Save(raftpb.HardState{
|
||||
Term: term,
|
||||
Vote: peers[0].ID,
|
||||
Commit: uint64(len(ents))}, ents)
|
||||
Commit: commit}, ents); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
|
||||
b, berr := st.Save()
|
||||
if berr != nil {
|
||||
ExitWithError(ExitError, berr)
|
||||
}
|
||||
|
||||
raftSnap := raftpb.Snapshot{
|
||||
Data: b,
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: commit,
|
||||
Term: term,
|
||||
ConfState: raftpb.ConfState{
|
||||
Nodes: nodeIDs,
|
||||
},
|
||||
},
|
||||
}
|
||||
snapshotter := snap.New(snapdir)
|
||||
if err := snapshotter.SaveSnap(raftSnap); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
}
|
||||
|
||||
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
||||
// the new raft instance by waiting for a future raft index.
|
||||
type initIndex struct{}
|
||||
type initIndex int
|
||||
|
||||
func (*initIndex) ConsistentIndex() uint64 { return 1 }
|
||||
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
|
||||
|
||||
// makeDB copies the database snapshot to the snapshot directory
|
||||
func makeDB(snapdir, dbfile string) {
|
||||
func makeDB(snapdir, dbfile string, commit int) {
|
||||
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
||||
if ferr != nil {
|
||||
ExitWithError(ExitInvalidInput, ferr)
|
||||
@ -329,7 +371,7 @@ func makeDB(snapdir, dbfile string) {
|
||||
// update consistentIndex so applies go through on etcdserver despite
|
||||
// having a new raft instance
|
||||
be := backend.NewDefaultBackend(dbpath)
|
||||
s := mvcc.NewStore(be, nil, &initIndex{})
|
||||
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
|
||||
id := s.TxnBegin()
|
||||
btx := be.BatchTx()
|
||||
del := func(k, v []byte) error {
|
||||
@ -339,6 +381,7 @@ func makeDB(snapdir, dbfile string) {
|
||||
|
||||
// delete stored members from old cluster since using new members
|
||||
btx.UnsafeForEach([]byte("members"), del)
|
||||
// todo: add back new members when we start to deprecate old snap file.
|
||||
btx.UnsafeForEach([]byte("members_removed"), del)
|
||||
// trigger write-out of new consistent index
|
||||
s.TxnEnd(id)
|
||||
|
@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
if len(line) == 1 {
|
||||
|
||||
// remove space from the line
|
||||
line = strings.TrimSpace(line)
|
||||
if len(line) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// remove trialling \n
|
||||
line = line[:len(line)-1]
|
||||
cmp, err := parseCompare(line)
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
if len(line) == 1 {
|
||||
|
||||
// remove space from the line
|
||||
line = strings.TrimSpace(line)
|
||||
if len(line) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// remove trialling \n
|
||||
line = line[:len(line)-1]
|
||||
op, err := parseRequestUnion(line)
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
|
@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
|
||||
}
|
||||
|
||||
func argify(s string) []string {
|
||||
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
|
||||
return r.FindAllString(s, -1)
|
||||
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
|
||||
args := r.FindAllString(s, -1)
|
||||
for i := range args {
|
||||
if len(args[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
if args[i][0] == '\'' {
|
||||
// 'single-quoted string'
|
||||
args[i] = args[i][1 : len(args)-1]
|
||||
} else if args[i][0] == '"' {
|
||||
// "double quoted string"
|
||||
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
||||
|
@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
watchInteractiveFunc(cmd, args)
|
||||
return
|
||||
}
|
||||
if len(args) < 1 || len(args) > 2 {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
|
||||
}
|
||||
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
key := args[0]
|
||||
if len(args) == 2 {
|
||||
if watchPrefix {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(args[1]))
|
||||
}
|
||||
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
c := mustClientFromCmd(cmd)
|
||||
wc := c.Watch(context.TODO(), key, opts...)
|
||||
printWatchCh(wc)
|
||||
err := c.Close()
|
||||
if err == nil {
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
wc, err := getWatchChan(c, args)
|
||||
if err != nil {
|
||||
ExitWithError(ExitBadArgs, err)
|
||||
}
|
||||
ExitWithError(ExitBadConnection, err)
|
||||
|
||||
printWatchCh(wc)
|
||||
if err = c.Close(); err != nil {
|
||||
ExitWithError(ExitBadConnection, err)
|
||||
}
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
}
|
||||
|
||||
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||
continue
|
||||
}
|
||||
moreargs := flagset.Args()
|
||||
if len(moreargs) < 1 || len(moreargs) > 2 {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
|
||||
ch, err := getWatchChan(c, flagset.Args())
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||
continue
|
||||
}
|
||||
var key string
|
||||
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
|
||||
if err != nil {
|
||||
key = moreargs[0]
|
||||
}
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
if len(moreargs) == 2 {
|
||||
if watchPrefix {
|
||||
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
|
||||
continue
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(moreargs[1]))
|
||||
}
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
ch := c.Watch(context.TODO(), key, opts...)
|
||||
go printWatchCh(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
|
||||
if len(args) < 1 || len(args) > 2 {
|
||||
return nil, fmt.Errorf("bad number of arguments")
|
||||
}
|
||||
key := args[0]
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
if len(args) == 2 {
|
||||
if watchPrefix {
|
||||
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(args[1]))
|
||||
}
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
return c.Watch(context.TODO(), key, opts...), nil
|
||||
}
|
||||
|
||||
func printWatchCh(ch clientv3.WatchChan) {
|
||||
for resp := range ch {
|
||||
display.Watch(resp)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
|
||||
}
|
||||
|
||||
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
||||
if err := checkBindURLs(cfg.lpurls); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkBindURLs(cfg.lcurls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
||||
// TODO(yichengq): check this for joining through discovery service case
|
||||
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
|
||||
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
||||
|
||||
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||
|
||||
// checkBindURLs returns an error if any URL uses a domain name.
|
||||
// TODO: return error in 3.2.0
|
||||
func checkBindURLs(urls []url.URL) error {
|
||||
for _, url := range urls {
|
||||
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||
continue
|
||||
}
|
||||
host, _, err := net.SplitHostPort(url.Host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if host == "localhost" {
|
||||
// special case for local address
|
||||
// TODO: support /etc/hosts ?
|
||||
continue
|
||||
}
|
||||
if net.ParseIP(host) == nil {
|
||||
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||
plog.Warning(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -188,11 +188,8 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
||||
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s, serr := concurrency.NewSession(cli)
|
||||
if serr != nil {
|
||||
t.Fatal(serr)
|
||||
}
|
||||
e := concurrency.NewElection(s, "test")
|
||||
|
||||
e := concurrency.NewElection(cli, "test")
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
err := e.Campaign(ctx, "abc")
|
||||
cancel()
|
||||
|
22
pkg/fileutil/dir_unix.go
Normal file
22
pkg/fileutil/dir_unix.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows
|
||||
|
||||
package fileutil
|
||||
|
||||
import "os"
|
||||
|
||||
// OpenDir opens a directory for syncing.
|
||||
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
46
pkg/fileutil/dir_windows.go
Normal file
46
pkg/fileutil/dir_windows.go
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build windows
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// OpenDir opens a directory in windows with write access for syncing.
|
||||
func OpenDir(path string) (*os.File, error) {
|
||||
fd, err := openDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.NewFile(uintptr(fd), path), nil
|
||||
}
|
||||
|
||||
func openDir(path string) (fd syscall.Handle, err error) {
|
||||
if len(path) == 0 {
|
||||
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
|
||||
}
|
||||
pathp, err := syscall.UTF16PtrFromString(path)
|
||||
if err != nil {
|
||||
return syscall.InvalidHandle, err
|
||||
}
|
||||
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
|
||||
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
|
||||
createmode := uint32(syscall.OPEN_EXISTING)
|
||||
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
|
||||
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
|
||||
}
|
@ -49,6 +49,7 @@ var (
|
||||
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.3.0"
|
||||
Version = "3.0.7"
|
||||
Version = "3.0.10"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
49
wal/wal.go
49
wal/wal.go
@ -67,7 +67,11 @@ var (
|
||||
// A just opened WAL is in read mode, and ready for reading records.
|
||||
// The WAL will be ready for appending after reading out all the previous records.
|
||||
type WAL struct {
|
||||
dir string // the living directory of the underlay files
|
||||
dir string // the living directory of the underlay files
|
||||
|
||||
// dirFile is a fd for the wal directory for syncing on Rename
|
||||
dirFile *os.File
|
||||
|
||||
metadata []byte // metadata recorded at the head of each WAL
|
||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||
|
||||
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := f.Seek(0, os.SEEK_END); err != nil {
|
||||
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -119,17 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
encoder: newEncoder(f, 0),
|
||||
}
|
||||
w.locks = append(w.locks, f)
|
||||
if err := w.saveCrc(0); err != nil {
|
||||
if err = w.saveCrc(0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return w.renameWal(tmpdirpath)
|
||||
if w, err = w.renameWal(tmpdirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// directory was renamed; sync parent dir to persist rename
|
||||
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
||||
if perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
if perr = pdir.Close(); err != nil {
|
||||
return nil, perr
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Open opens the WAL at the given snap.
|
||||
@ -139,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||
// previous records.
|
||||
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||
return openAtIndex(dirpath, snap, true)
|
||||
w, err := openAtIndex(dirpath, snap, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OpenForRead only opens the wal files for read.
|
||||
@ -371,6 +398,10 @@ func (w *WAL) cut() error {
|
||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newTail.Close()
|
||||
|
||||
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||
@ -473,7 +504,7 @@ func (w *WAL) Close() error {
|
||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return w.dirFile.Close()
|
||||
}
|
||||
|
||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||
|
@ -16,7 +16,11 @@
|
||||
|
||||
package wal
|
||||
|
||||
import "os"
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
|
||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||
@ -34,5 +38,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
}
|
||||
|
||||
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
||||
return w, nil
|
||||
df, err := fileutil.OpenDir(w.dir)
|
||||
w.dirFile = df
|
||||
return w, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user