Compare commits
20 Commits
Author | SHA1 | Date | |
---|---|---|---|
494c012659 | |||
4abc381ebe | |||
73c8fdac53 | |||
ee2717493a | |||
2435eb9ecd | |||
8fb533dabe | |||
2f0f5ac504 | |||
9ab811d478 | |||
e0a99fb4ba | |||
d40982fc91 | |||
fe3a1cc31b | |||
70713706a1 | |||
0054e7e89b | |||
97f718b504 | |||
202da9270e | |||
6e83ec0ed7 | |||
5c44cdfdaa | |||
09a239f040 | |||
3faff8b2e2 | |||
2345fda18e |
@ -22,7 +22,10 @@ import (
|
|||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
// isSubset returns true if a is a subset of b
|
// isSubset returns true if a is a subset of b.
|
||||||
|
// If a is a prefix of b, then a is a subset of b.
|
||||||
|
// Given intervals [a1,a2) and [b1,b2), is
|
||||||
|
// the a interval a subset of b?
|
||||||
func isSubset(a, b *rangePerm) bool {
|
func isSubset(a, b *rangePerm) bool {
|
||||||
switch {
|
switch {
|
||||||
case len(a.end) == 0 && len(b.end) == 0:
|
case len(a.end) == 0 && len(b.end) == 0:
|
||||||
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
|
|||||||
// b is a key, a is a range
|
// b is a key, a is a range
|
||||||
return false
|
return false
|
||||||
case len(a.end) == 0:
|
case len(a.end) == 0:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
|
// a is a key, b is a range. need b1 <= a1 and a1 < b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
|
||||||
default:
|
default:
|
||||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
|
// both are ranges. need b1 <= a1 and a2 <= b2
|
||||||
|
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
|
|||||||
i := 0
|
i := 0
|
||||||
for i < len(perms) {
|
for i < len(perms) {
|
||||||
begin, next := i, i
|
begin, next := i, i
|
||||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
|
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
|
||||||
next++
|
next++
|
||||||
}
|
}
|
||||||
|
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
|
||||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
if next != begin && len(perms[next].end) > 0 {
|
||||||
|
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||||
|
} else {
|
||||||
|
merged = append(merged, perms[begin])
|
||||||
|
if next != begin {
|
||||||
|
merged = append(merged, perms[next])
|
||||||
|
}
|
||||||
|
}
|
||||||
i = next + 1
|
i = next + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
||||||
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
|
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||||
},
|
},
|
||||||
// duplicate ranges
|
// duplicate ranges
|
||||||
{
|
{
|
||||||
|
@ -45,6 +45,8 @@ type simpleBalancer struct {
|
|||||||
// pinAddr is the currently pinned address; set to the empty string on
|
// pinAddr is the currently pinned address; set to the empty string on
|
||||||
// intialization and shutdown.
|
// intialization and shutdown.
|
||||||
pinAddr string
|
pinAddr string
|
||||||
|
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSimpleBalancer(eps []string) *simpleBalancer {
|
func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||||
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
|||||||
|
|
||||||
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
|
// gRPC might call Up after it called Close. We add this check
|
||||||
|
// to "fix" it up at application layer. Or our simplerBalancer
|
||||||
|
// might panic since b.upc is closed.
|
||||||
|
if b.closed {
|
||||||
|
return func(err error) {}
|
||||||
|
}
|
||||||
|
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
// notify waiting Get()s and pin first connected address
|
// notify waiting Get()s and pin first connected address
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
b.pinAddr = addr.Addr
|
b.pinAddr = addr.Addr
|
||||||
}
|
}
|
||||||
b.upEps[addr.Addr] = struct{}{}
|
b.upEps[addr.Addr] = struct{}{}
|
||||||
b.mu.Unlock()
|
|
||||||
// notify client that a connection is up
|
// notify client that a connection is up
|
||||||
b.readyOnce.Do(func() { close(b.readyc) })
|
b.readyOnce.Do(func() { close(b.readyc) })
|
||||||
|
|
||||||
return func(err error) {
|
return func(err error) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
delete(b.upEps, addr.Addr)
|
delete(b.upEps, addr.Addr)
|
||||||
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
|||||||
|
|
||||||
func (b *simpleBalancer) Close() error {
|
func (b *simpleBalancer) Close() error {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
// In case gRPC calls close twice. TODO: remove the checking
|
||||||
|
// when we are sure that gRPC wont call close twice.
|
||||||
|
if b.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.closed = true
|
||||||
close(b.notifyCh)
|
close(b.notifyCh)
|
||||||
// terminate all waiting Get()s
|
// terminate all waiting Get()s
|
||||||
b.pinAddr = ""
|
b.pinAddr = ""
|
||||||
if len(b.upEps) == 0 {
|
if len(b.upEps) == 0 {
|
||||||
close(b.upc)
|
close(b.upc)
|
||||||
}
|
}
|
||||||
b.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -669,6 +669,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
w.mu.RUnlock()
|
w.mu.RUnlock()
|
||||||
|
|
||||||
for _, ws := range streams {
|
for _, ws := range streams {
|
||||||
|
// drain recvc so no old WatchResponses (e.g., Created messages)
|
||||||
|
// are processed while resuming
|
||||||
|
ws.drain()
|
||||||
|
|
||||||
// pause serveStream
|
// pause serveStream
|
||||||
ws.resumec <- -1
|
ws.resumec <- -1
|
||||||
|
|
||||||
@ -701,6 +705,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drain removes all buffered WatchResponses from the stream's receive channel.
|
||||||
|
func (ws *watcherStream) drain() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ws.recvc:
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||||
req := &pb.WatchCreateRequest{
|
req := &pb.WatchCreateRequest{
|
||||||
|
@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
|
|||||||
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
||||||
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
||||||
}
|
}
|
||||||
|
rqs := []txnRequests{
|
||||||
rqs := txnRequests{
|
{
|
||||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||||
ifSucess: []string{"get key1", "get key2"},
|
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
|
||||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
compare: []string{`version("key \"with\" space") = "1"`},
|
||||||
|
ifSucess: []string{`get "key \"with\" space"`},
|
||||||
|
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if err := ctlV3Txn(cx, rqs); err != nil {
|
for _, rq := range rqs {
|
||||||
cx.t.Fatal(err)
|
if err := ctlV3Txn(cx, rq); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
|
|||||||
var cfg authCfg
|
var cfg authCfg
|
||||||
|
|
||||||
splitted := strings.SplitN(userFlag, ":", 2)
|
splitted := strings.SplitN(userFlag, ":", 2)
|
||||||
if len(splitted) == 0 {
|
if len(splitted) < 2 {
|
||||||
cfg.username = userFlag
|
cfg.username = userFlag
|
||||||
cfg.password, err = speakeasy.Ask("Password: ")
|
cfg.password, err = speakeasy.Ask("Password: ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -36,7 +36,10 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
"github.com/coreos/etcd/snap"
|
||||||
|
"github.com/coreos/etcd/store"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -186,8 +189,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
||||||
}
|
}
|
||||||
|
|
||||||
makeDB(snapdir, args[0])
|
makeDB(snapdir, args[0], len(cl.Members()))
|
||||||
makeWAL(waldir, cl)
|
makeWALAndSnap(waldir, snapdir, cl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initialClusterFromName(name string) string {
|
func initialClusterFromName(name string) string {
|
||||||
@ -199,11 +202,18 @@ func initialClusterFromName(name string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// makeWAL creates a WAL for the initial cluster
|
// makeWAL creates a WAL for the initial cluster
|
||||||
func makeWAL(waldir string, cl *membership.RaftCluster) {
|
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
|
||||||
if err := fileutil.CreateDirAll(waldir); err != nil {
|
if err := fileutil.CreateDirAll(waldir); err != nil {
|
||||||
ExitWithError(ExitIO, err)
|
ExitWithError(ExitIO, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add members again to persist them to the store we create.
|
||||||
|
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||||
|
cl.SetStore(st)
|
||||||
|
for _, m := range cl.Members() {
|
||||||
|
cl.AddMember(m)
|
||||||
|
}
|
||||||
|
|
||||||
m := cl.MemberByName(restoreName)
|
m := cl.MemberByName(restoreName)
|
||||||
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
||||||
metadata, merr := md.Marshal()
|
metadata, merr := md.Marshal()
|
||||||
@ -227,7 +237,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ents := make([]raftpb.Entry, len(peers))
|
ents := make([]raftpb.Entry, len(peers))
|
||||||
|
nodeIDs := make([]uint64, len(peers))
|
||||||
for i, p := range peers {
|
for i, p := range peers {
|
||||||
|
nodeIDs[i] = p.ID
|
||||||
cc := raftpb.ConfChange{
|
cc := raftpb.ConfChange{
|
||||||
Type: raftpb.ConfChangeAddNode,
|
Type: raftpb.ConfChangeAddNode,
|
||||||
NodeID: p.ID,
|
NodeID: p.ID,
|
||||||
@ -245,20 +257,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
|||||||
ents[i] = e
|
ents[i] = e
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Save(raftpb.HardState{
|
commit, term := uint64(len(ents)), uint64(1)
|
||||||
Term: 1,
|
|
||||||
|
if err := w.Save(raftpb.HardState{
|
||||||
|
Term: term,
|
||||||
Vote: peers[0].ID,
|
Vote: peers[0].ID,
|
||||||
Commit: uint64(len(ents))}, ents)
|
Commit: commit}, ents); err != nil {
|
||||||
|
ExitWithError(ExitIO, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, berr := st.Save()
|
||||||
|
if berr != nil {
|
||||||
|
ExitWithError(ExitError, berr)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftSnap := raftpb.Snapshot{
|
||||||
|
Data: b,
|
||||||
|
Metadata: raftpb.SnapshotMetadata{
|
||||||
|
Index: commit,
|
||||||
|
Term: term,
|
||||||
|
ConfState: raftpb.ConfState{
|
||||||
|
Nodes: nodeIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
snapshotter := snap.New(snapdir)
|
||||||
|
if err := snapshotter.SaveSnap(raftSnap); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
|
||||||
|
ExitWithError(ExitIO, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
||||||
// the new raft instance by waiting for a future raft index.
|
// the new raft instance by waiting for a future raft index.
|
||||||
type initIndex struct{}
|
type initIndex int
|
||||||
|
|
||||||
func (*initIndex) ConsistentIndex() uint64 { return 1 }
|
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
|
||||||
|
|
||||||
// makeDB copies the database snapshot to the snapshot directory
|
// makeDB copies the database snapshot to the snapshot directory
|
||||||
func makeDB(snapdir, dbfile string) {
|
func makeDB(snapdir, dbfile string, commit int) {
|
||||||
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
||||||
if ferr != nil {
|
if ferr != nil {
|
||||||
ExitWithError(ExitInvalidInput, ferr)
|
ExitWithError(ExitInvalidInput, ferr)
|
||||||
@ -329,7 +369,7 @@ func makeDB(snapdir, dbfile string) {
|
|||||||
// update consistentIndex so applies go through on etcdserver despite
|
// update consistentIndex so applies go through on etcdserver despite
|
||||||
// having a new raft instance
|
// having a new raft instance
|
||||||
be := backend.NewDefaultBackend(dbpath)
|
be := backend.NewDefaultBackend(dbpath)
|
||||||
s := mvcc.NewStore(be, nil, &initIndex{})
|
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
|
||||||
id := s.TxnBegin()
|
id := s.TxnBegin()
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
del := func(k, v []byte) error {
|
del := func(k, v []byte) error {
|
||||||
@ -339,6 +379,7 @@ func makeDB(snapdir, dbfile string) {
|
|||||||
|
|
||||||
// delete stored members from old cluster since using new members
|
// delete stored members from old cluster since using new members
|
||||||
btx.UnsafeForEach([]byte("members"), del)
|
btx.UnsafeForEach([]byte("members"), del)
|
||||||
|
// todo: add back new members when we start to deprecate old snap file.
|
||||||
btx.UnsafeForEach([]byte("members_removed"), del)
|
btx.UnsafeForEach([]byte("members_removed"), del)
|
||||||
// trigger write-out of new consistent index
|
// trigger write-out of new consistent index
|
||||||
s.TxnEnd(id)
|
s.TxnEnd(id)
|
||||||
|
@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
cmp, err := parseCompare(line)
|
cmp, err := parseCompare(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
}
|
}
|
||||||
if len(line) == 1 {
|
|
||||||
|
// remove space from the line
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if len(line) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove trialling \n
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
op, err := parseRequestUnion(line)
|
op, err := parseRequestUnion(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitInvalidInput, err)
|
ExitWithError(ExitInvalidInput, err)
|
||||||
|
@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func argify(s string) []string {
|
func argify(s string) []string {
|
||||||
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
|
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
|
||||||
return r.FindAllString(s, -1)
|
args := r.FindAllString(s, -1)
|
||||||
|
for i := range args {
|
||||||
|
if len(args[i]) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if args[i][0] == '\'' {
|
||||||
|
// 'single-quoted string'
|
||||||
|
args[i] = args[i][1 : len(args)-1]
|
||||||
|
} else if args[i][0] == '"' {
|
||||||
|
// "double quoted string"
|
||||||
|
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
|
||||||
|
ExitWithError(ExitInvalidInput, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
||||||
|
@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
watchInteractiveFunc(cmd, args)
|
watchInteractiveFunc(cmd, args)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(args) < 1 || len(args) > 2 {
|
|
||||||
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
|
||||||
key := args[0]
|
|
||||||
if len(args) == 2 {
|
|
||||||
if watchPrefix {
|
|
||||||
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
|
|
||||||
}
|
|
||||||
opts = append(opts, clientv3.WithRange(args[1]))
|
|
||||||
}
|
|
||||||
|
|
||||||
if watchPrefix {
|
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
|
||||||
}
|
|
||||||
c := mustClientFromCmd(cmd)
|
c := mustClientFromCmd(cmd)
|
||||||
wc := c.Watch(context.TODO(), key, opts...)
|
wc, err := getWatchChan(c, args)
|
||||||
printWatchCh(wc)
|
if err != nil {
|
||||||
err := c.Close()
|
ExitWithError(ExitBadArgs, err)
|
||||||
if err == nil {
|
|
||||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
|
||||||
}
|
}
|
||||||
ExitWithError(ExitBadConnection, err)
|
|
||||||
|
printWatchCh(wc)
|
||||||
|
if err = c.Close(); err != nil {
|
||||||
|
ExitWithError(ExitBadConnection, err)
|
||||||
|
}
|
||||||
|
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||||
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
moreargs := flagset.Args()
|
ch, err := getWatchChan(c, flagset.Args())
|
||||||
if len(moreargs) < 1 || len(moreargs) > 2 {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
|
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var key string
|
|
||||||
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
|
|
||||||
if err != nil {
|
|
||||||
key = moreargs[0]
|
|
||||||
}
|
|
||||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
|
||||||
if len(moreargs) == 2 {
|
|
||||||
if watchPrefix {
|
|
||||||
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
opts = append(opts, clientv3.WithRange(moreargs[1]))
|
|
||||||
}
|
|
||||||
if watchPrefix {
|
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
|
||||||
}
|
|
||||||
ch := c.Watch(context.TODO(), key, opts...)
|
|
||||||
go printWatchCh(ch)
|
go printWatchCh(ch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
|
||||||
|
if len(args) < 1 || len(args) > 2 {
|
||||||
|
return nil, fmt.Errorf("bad number of arguments")
|
||||||
|
}
|
||||||
|
key := args[0]
|
||||||
|
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||||
|
if len(args) == 2 {
|
||||||
|
if watchPrefix {
|
||||||
|
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
|
||||||
|
}
|
||||||
|
opts = append(opts, clientv3.WithRange(args[1]))
|
||||||
|
}
|
||||||
|
if watchPrefix {
|
||||||
|
opts = append(opts, clientv3.WithPrefix())
|
||||||
|
}
|
||||||
|
return c.Watch(context.TODO(), key, opts...), nil
|
||||||
|
}
|
||||||
|
|
||||||
func printWatchCh(ch clientv3.WatchChan) {
|
func printWatchCh(ch clientv3.WatchChan) {
|
||||||
for resp := range ch {
|
for resp := range ch {
|
||||||
display.Watch(resp)
|
display.Watch(resp)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
||||||
|
if err := checkBindURLs(cfg.lpurls); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := checkBindURLs(cfg.lcurls); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
||||||
// TODO(yichengq): check this for joining through discovery service case
|
// TODO(yichengq): check this for joining through discovery service case
|
||||||
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||||
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
|
|||||||
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
||||||
|
|
||||||
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||||
|
|
||||||
|
// checkBindURLs returns an error if any URL uses a domain name.
|
||||||
|
// TODO: return error in 3.2.0
|
||||||
|
func checkBindURLs(urls []url.URL) error {
|
||||||
|
for _, url := range urls {
|
||||||
|
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
host, _, err := net.SplitHostPort(url.Host)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if host == "localhost" {
|
||||||
|
// special case for local address
|
||||||
|
// TODO: support /etc/hosts ?
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if net.ParseIP(host) == nil {
|
||||||
|
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||||
|
plog.Warning(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -188,11 +188,8 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
|||||||
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
s, serr := concurrency.NewSession(cli)
|
|
||||||
if serr != nil {
|
e := concurrency.NewElection(cli, "test")
|
||||||
t.Fatal(serr)
|
|
||||||
}
|
|
||||||
e := concurrency.NewElection(s, "test")
|
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
err := e.Campaign(ctx, "abc")
|
err := e.Campaign(ctx, "abc")
|
||||||
cancel()
|
cancel()
|
||||||
|
22
pkg/fileutil/dir_unix.go
Normal file
22
pkg/fileutil/dir_unix.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
|
||||||
|
// OpenDir opens a directory for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
46
pkg/fileutil/dir_windows.go
Normal file
46
pkg/fileutil/dir_windows.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build windows
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OpenDir opens a directory in windows with write access for syncing.
|
||||||
|
func OpenDir(path string) (*os.File, error) {
|
||||||
|
fd, err := openDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return os.NewFile(uintptr(fd), path), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openDir(path string) (fd syscall.Handle, err error) {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
|
||||||
|
}
|
||||||
|
pathp, err := syscall.UTF16PtrFromString(path)
|
||||||
|
if err != nil {
|
||||||
|
return syscall.InvalidHandle, err
|
||||||
|
}
|
||||||
|
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
|
||||||
|
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
|
||||||
|
createmode := uint32(syscall.OPEN_EXISTING)
|
||||||
|
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
|
||||||
|
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
|
||||||
|
}
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.7"
|
Version = "3.0.9"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
49
wal/wal.go
49
wal/wal.go
@ -67,7 +67,11 @@ var (
|
|||||||
// A just opened WAL is in read mode, and ready for reading records.
|
// A just opened WAL is in read mode, and ready for reading records.
|
||||||
// The WAL will be ready for appending after reading out all the previous records.
|
// The WAL will be ready for appending after reading out all the previous records.
|
||||||
type WAL struct {
|
type WAL struct {
|
||||||
dir string // the living directory of the underlay files
|
dir string // the living directory of the underlay files
|
||||||
|
|
||||||
|
// dirFile is a fd for the wal directory for syncing on Rename
|
||||||
|
dirFile *os.File
|
||||||
|
|
||||||
metadata []byte // metadata recorded at the head of each WAL
|
metadata []byte // metadata recorded at the head of each WAL
|
||||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||||
|
|
||||||
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if _, err := f.Seek(0, os.SEEK_END); err != nil {
|
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,17 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
encoder: newEncoder(f, 0),
|
encoder: newEncoder(f, 0),
|
||||||
}
|
}
|
||||||
w.locks = append(w.locks, f)
|
w.locks = append(w.locks, f)
|
||||||
if err := w.saveCrc(0); err != nil {
|
if err = w.saveCrc(0); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.renameWal(tmpdirpath)
|
if w, err = w.renameWal(tmpdirpath); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// directory was renamed; sync parent dir to persist rename
|
||||||
|
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
||||||
|
if perr != nil {
|
||||||
|
return nil, perr
|
||||||
|
}
|
||||||
|
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||||
|
return nil, perr
|
||||||
|
}
|
||||||
|
if perr = pdir.Close(); err != nil {
|
||||||
|
return nil, perr
|
||||||
|
}
|
||||||
|
|
||||||
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the WAL at the given snap.
|
// Open opens the WAL at the given snap.
|
||||||
@ -139,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||||
// previous records.
|
// previous records.
|
||||||
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
return openAtIndex(dirpath, snap, true)
|
w, err := openAtIndex(dirpath, snap, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenForRead only opens the wal files for read.
|
// OpenForRead only opens the wal files for read.
|
||||||
@ -371,6 +398,10 @@ func (w *WAL) cut() error {
|
|||||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
newTail.Close()
|
newTail.Close()
|
||||||
|
|
||||||
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||||
@ -473,7 +504,7 @@ func (w *WAL) Close() error {
|
|||||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return w.dirFile.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||||
|
@ -16,7 +16,11 @@
|
|||||||
|
|
||||||
package wal
|
package wal
|
||||||
|
|
||||||
import "os"
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
)
|
||||||
|
|
||||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||||
// On non-Windows platforms, hold the lock while renaming. Releasing
|
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||||
@ -34,5 +38,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
||||||
return w, nil
|
df, err := fileutil.OpenDir(w.dir)
|
||||||
|
w.dirFile = df
|
||||||
|
return w, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user