Compare commits
37 Commits
Author | SHA1 | Date | |
---|---|---|---|
546c0f7ed6 | |||
adbad1c9b5 | |||
273b986751 | |||
5b205729b9 | |||
fe900b09dd | |||
494c012659 | |||
4abc381ebe | |||
73c8fdac53 | |||
ee2717493a | |||
2435eb9ecd | |||
8fb533dabe | |||
2f0f5ac504 | |||
9ab811d478 | |||
e0a99fb4ba | |||
d40982fc91 | |||
fe3a1cc31b | |||
70713706a1 | |||
0054e7e89b | |||
97f718b504 | |||
202da9270e | |||
6e83ec0ed7 | |||
5c44cdfdaa | |||
09a239f040 | |||
3faff8b2e2 | |||
2345fda18e | |||
5695120efc | |||
183293e061 | |||
4b48876f0e | |||
5089bf58fb | |||
480a347179 | |||
59e560c7a7 | |||
0bd9bea2e9 | |||
bd7581ac59 | |||
db378c3d26 | |||
23740162dc | |||
96422a955f | |||
6fd996fdac |
@ -22,7 +22,10 @@ import (
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
)
|
||||
|
||||
// isSubset returns true if a is a subset of b
|
||||
// isSubset returns true if a is a subset of b.
|
||||
// If a is a prefix of b, then a is a subset of b.
|
||||
// Given intervals [a1,a2) and [b1,b2), is
|
||||
// the a interval a subset of b?
|
||||
func isSubset(a, b *rangePerm) bool {
|
||||
switch {
|
||||
case len(a.end) == 0 && len(b.end) == 0:
|
||||
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
|
||||
// b is a key, a is a range
|
||||
return false
|
||||
case len(a.end) == 0:
|
||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
|
||||
// a is a key, b is a range. need b1 <= a1 and a1 < b2
|
||||
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
|
||||
default:
|
||||
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
|
||||
// both are ranges. need b1 <= a1 and a2 <= b2
|
||||
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
|
||||
i := 0
|
||||
for i < len(perms) {
|
||||
begin, next := i, i
|
||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
|
||||
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
|
||||
next++
|
||||
}
|
||||
|
||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||
|
||||
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
|
||||
if next != begin && len(perms[next].end) > 0 {
|
||||
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
|
||||
} else {
|
||||
merged = append(merged, perms[begin])
|
||||
if next != begin {
|
||||
merged = append(merged, perms[next])
|
||||
}
|
||||
}
|
||||
i = next + 1
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}},
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("c")}},
|
||||
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
|
||||
},
|
||||
{
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
|
||||
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
|
||||
},
|
||||
// duplicate ranges
|
||||
{
|
||||
|
@ -45,6 +45,8 @@ type simpleBalancer struct {
|
||||
// pinAddr is the currently pinned address; set to the empty string on
|
||||
// intialization and shutdown.
|
||||
pinAddr string
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
||||
|
||||
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// gRPC might call Up after it called Close. We add this check
|
||||
// to "fix" it up at application layer. Or our simplerBalancer
|
||||
// might panic since b.upc is closed.
|
||||
if b.closed {
|
||||
return func(err error) {}
|
||||
}
|
||||
|
||||
if len(b.upEps) == 0 {
|
||||
// notify waiting Get()s and pin first connected address
|
||||
close(b.upc)
|
||||
b.pinAddr = addr.Addr
|
||||
}
|
||||
b.upEps[addr.Addr] = struct{}{}
|
||||
b.mu.Unlock()
|
||||
|
||||
// notify client that a connection is up
|
||||
b.readyOnce.Do(func() { close(b.readyc) })
|
||||
|
||||
return func(err error) {
|
||||
b.mu.Lock()
|
||||
delete(b.upEps, addr.Addr)
|
||||
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
||||
|
||||
func (b *simpleBalancer) Close() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
// In case gRPC calls close twice. TODO: remove the checking
|
||||
// when we are sure that gRPC wont call close twice.
|
||||
if b.closed {
|
||||
return nil
|
||||
}
|
||||
b.closed = true
|
||||
close(b.notifyCh)
|
||||
// terminate all waiting Get()s
|
||||
b.pinAddr = ""
|
||||
if len(b.upEps) == 0 {
|
||||
close(b.upc)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ type Election struct {
|
||||
|
||||
// NewElection returns a new election on a given key prefix.
|
||||
func NewElection(client *v3.Client, pfx string) *Election {
|
||||
return &Election{client: client, keyPrefix: pfx}
|
||||
return &Election{client: client, keyPrefix: pfx + "/"}
|
||||
}
|
||||
|
||||
// Campaign puts a value as eligible for the election. It blocks until
|
||||
@ -59,7 +59,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
|
||||
if !resp.Succeeded {
|
||||
kv := resp.Responses[0].GetResponseRange().Kvs[0]
|
||||
|
@ -32,7 +32,7 @@ type Mutex struct {
|
||||
}
|
||||
|
||||
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
||||
return &Mutex{client, pfx, "", -1}
|
||||
return &Mutex{client, pfx + "/", "", -1}
|
||||
}
|
||||
|
||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||
@ -43,7 +43,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
||||
return serr
|
||||
}
|
||||
|
||||
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
|
||||
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
|
||||
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
||||
|
@ -127,6 +127,8 @@ type watchGrpcStream struct {
|
||||
donec chan struct{}
|
||||
// errc transmits errors from grpc Recv to the watch stream reconn logic
|
||||
errc chan error
|
||||
// closingc gets the watcherStream of closing watchers
|
||||
closingc chan *watcherStream
|
||||
|
||||
// the error that closed the watch stream
|
||||
closeErr error
|
||||
@ -189,11 +191,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
||||
cancel: cancel,
|
||||
streams: make(map[int64]*watcherStream),
|
||||
|
||||
respc: make(chan *pb.WatchResponse),
|
||||
reqc: make(chan *watchRequest),
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
respc: make(chan *pb.WatchResponse),
|
||||
reqc: make(chan *watchRequest),
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
closingc: make(chan *watcherStream),
|
||||
}
|
||||
go wgs.run()
|
||||
return wgs
|
||||
@ -242,7 +245,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
|
||||
case reqc <- wr:
|
||||
ok = true
|
||||
case <-wr.ctx.Done():
|
||||
wgs.stopIfEmpty()
|
||||
case <-donec:
|
||||
if wgs.closeErr != nil {
|
||||
closeCh <- WatchResponse{closeErr: wgs.closeErr}
|
||||
@ -352,15 +354,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
|
||||
go w.serveStream(ws)
|
||||
}
|
||||
|
||||
// closeStream closes the watcher resources and removes it
|
||||
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
|
||||
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
|
||||
w.mu.Lock()
|
||||
// cancels request stream; subscriber receives nil channel
|
||||
close(ws.initReq.retc)
|
||||
// close subscriber's channel
|
||||
close(ws.outc)
|
||||
delete(w.streams, ws.id)
|
||||
empty := len(w.streams) == 0
|
||||
if empty && w.stopc != nil {
|
||||
w.stopc = nil
|
||||
}
|
||||
w.mu.Unlock()
|
||||
return empty
|
||||
}
|
||||
|
||||
// run is the root of the goroutines for managing a watcher client
|
||||
@ -464,6 +470,10 @@ func (w *watchGrpcStream) run() {
|
||||
cancelSet = make(map[int64]struct{})
|
||||
case <-stopc:
|
||||
return
|
||||
case ws := <-w.closingc:
|
||||
if w.closeStream(ws) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// send failed; queue for retry
|
||||
@ -522,6 +532,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
|
||||
|
||||
// serveStream forwards watch responses from run() to the subscriber
|
||||
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
||||
defer func() {
|
||||
// signal that this watcherStream is finished
|
||||
select {
|
||||
case w.closingc <- ws:
|
||||
case <-w.donec:
|
||||
w.closeStream(ws)
|
||||
}
|
||||
}()
|
||||
|
||||
var closeErr error
|
||||
emptyWr := &WatchResponse{}
|
||||
wrs := []*WatchResponse{}
|
||||
@ -602,20 +621,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
|
||||
}
|
||||
}
|
||||
|
||||
w.closeStream(ws)
|
||||
w.stopIfEmpty()
|
||||
// lazily send cancel message if events on missing id
|
||||
}
|
||||
|
||||
func (wgs *watchGrpcStream) stopIfEmpty() {
|
||||
wgs.mu.Lock()
|
||||
if len(wgs.streams) == 0 && wgs.stopc != nil {
|
||||
close(wgs.stopc)
|
||||
wgs.stopc = nil
|
||||
}
|
||||
wgs.mu.Unlock()
|
||||
}
|
||||
|
||||
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
|
||||
ws, rerr := w.resume()
|
||||
if rerr != nil {
|
||||
@ -669,6 +677,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
||||
w.mu.RUnlock()
|
||||
|
||||
for _, ws := range streams {
|
||||
// drain recvc so no old WatchResponses (e.g., Created messages)
|
||||
// are processed while resuming
|
||||
ws.drain()
|
||||
|
||||
// pause serveStream
|
||||
ws.resumec <- -1
|
||||
|
||||
@ -701,6 +713,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// drain removes all buffered WatchResponses from the stream's receive channel.
|
||||
func (ws *watcherStream) drain() {
|
||||
for {
|
||||
select {
|
||||
case <-ws.recvc:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
|
||||
func (wr *watchRequest) toPB() *pb.WatchRequest {
|
||||
req := &pb.WatchCreateRequest{
|
||||
|
@ -53,8 +53,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
return err
|
||||
}
|
||||
for _, srv := range addrs {
|
||||
target := strings.TrimSuffix(srv.Target, ".")
|
||||
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
|
||||
port := fmt.Sprintf("%d", srv.Port)
|
||||
host := net.JoinHostPort(srv.Target, port)
|
||||
tcpAddr, err := resolveTCPAddr("tcp", host)
|
||||
if err != nil {
|
||||
plog.Warningf("couldn't resolve host %s during SRV discovery", host)
|
||||
@ -70,8 +70,11 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
n = fmt.Sprintf("%d", tempName)
|
||||
tempName += 1
|
||||
}
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
|
||||
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, host)
|
||||
// SRV records have a trailing dot but URL shouldn't.
|
||||
shortHost := strings.TrimSuffix(srv.Target, ".")
|
||||
urlHost := net.JoinHostPort(shortHost, port)
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, urlHost))
|
||||
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, urlHost)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package discovery
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
@ -29,11 +30,22 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
}()
|
||||
|
||||
name := "dnsClusterTest"
|
||||
dns := map[string]string{
|
||||
"1.example.com.:2480": "10.0.0.1:2480",
|
||||
"2.example.com.:2480": "10.0.0.2:2480",
|
||||
"3.example.com.:2480": "10.0.0.3:2480",
|
||||
"4.example.com.:2380": "10.0.0.3:2380",
|
||||
}
|
||||
srvAll := []*net.SRV{
|
||||
{Target: "1.example.com.", Port: 2480},
|
||||
{Target: "2.example.com.", Port: 2480},
|
||||
{Target: "3.example.com.", Port: 2480},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
withSSL []*net.SRV
|
||||
withoutSSL []*net.SRV
|
||||
urls []string
|
||||
dns map[string]string
|
||||
|
||||
expected string
|
||||
}{
|
||||
@ -41,61 +53,50 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
[]*net.SRV{},
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
nil,
|
||||
|
||||
"",
|
||||
},
|
||||
{
|
||||
[]*net.SRV{
|
||||
{Target: "10.0.0.1", Port: 2480},
|
||||
{Target: "10.0.0.2", Port: 2480},
|
||||
{Target: "10.0.0.3", Port: 2480},
|
||||
},
|
||||
srvAll,
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
|
||||
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480",
|
||||
},
|
||||
{
|
||||
srvAll,
|
||||
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
|
||||
nil,
|
||||
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
||||
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480,3=http://4.example.com:2380",
|
||||
},
|
||||
{
|
||||
[]*net.SRV{
|
||||
{Target: "10.0.0.1", Port: 2480},
|
||||
{Target: "10.0.0.2", Port: 2480},
|
||||
{Target: "10.0.0.3", Port: 2480},
|
||||
},
|
||||
[]*net.SRV{
|
||||
{Target: "10.0.0.1", Port: 2380},
|
||||
},
|
||||
nil,
|
||||
nil,
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:2380",
|
||||
},
|
||||
{
|
||||
[]*net.SRV{
|
||||
{Target: "10.0.0.1", Port: 2480},
|
||||
{Target: "10.0.0.2", Port: 2480},
|
||||
{Target: "10.0.0.3", Port: 2480},
|
||||
},
|
||||
[]*net.SRV{
|
||||
{Target: "10.0.0.1", Port: 2380},
|
||||
},
|
||||
srvAll,
|
||||
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
nil,
|
||||
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:2380",
|
||||
|
||||
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480,2=http://4.example.com:2380",
|
||||
},
|
||||
// matching local member with resolved addr and return unresolved hostnames
|
||||
{
|
||||
[]*net.SRV{
|
||||
{Target: "1.example.com.", Port: 2480},
|
||||
{Target: "2.example.com.", Port: 2480},
|
||||
{Target: "3.example.com.", Port: 2480},
|
||||
},
|
||||
srvAll,
|
||||
nil,
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
|
||||
|
||||
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
||||
},
|
||||
// invalid
|
||||
}
|
||||
|
||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||
if strings.Contains(addr, "10.0.0.") {
|
||||
// accept IP addresses when resolving apurls
|
||||
return net.ResolveTCPAddr(network, addr)
|
||||
}
|
||||
if dns[addr] == "" {
|
||||
return nil, errors.New("missing dns record")
|
||||
}
|
||||
return net.ResolveTCPAddr(network, dns[addr])
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -108,12 +109,6 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
}
|
||||
return "", nil, errors.New("Unknown service in mock")
|
||||
}
|
||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||
if tt.dns == nil || tt.dns[addr] == "" {
|
||||
return net.ResolveTCPAddr(network, addr)
|
||||
}
|
||||
return net.ResolveTCPAddr(network, tt.dns[addr])
|
||||
}
|
||||
urls := testutil.MustNewURLs(t, tt.urls)
|
||||
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
||||
if err != nil {
|
||||
|
@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
|
||||
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
|
||||
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
|
||||
}
|
||||
|
||||
rqs := txnRequests{
|
||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||
ifSucess: []string{"get key1", "get key2"},
|
||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||
rqs := []txnRequests{
|
||||
{
|
||||
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
|
||||
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
|
||||
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
|
||||
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
|
||||
},
|
||||
{
|
||||
compare: []string{`version("key \"with\" space") = "1"`},
|
||||
ifSucess: []string{`get "key \"with\" space"`},
|
||||
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
|
||||
},
|
||||
}
|
||||
if err := ctlV3Txn(cx, rqs); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
for _, rq := range rqs {
|
||||
if err := ctlV3Txn(cx, rq); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
|
||||
var cfg authCfg
|
||||
|
||||
splitted := strings.SplitN(userFlag, ":", 2)
|
||||
if len(splitted) == 0 {
|
||||
if len(splitted) < 2 {
|
||||
cfg.username = userFlag
|
||||
cfg.password, err = speakeasy.Ask("Password: ")
|
||||
if err != nil {
|
||||
|
@ -36,7 +36,10 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -112,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
partpath := path + ".part"
|
||||
f, err := os.Create(partpath)
|
||||
defer f.Close()
|
||||
|
||||
if err != nil {
|
||||
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
|
||||
ExitWithError(ExitBadArgs, exiterr)
|
||||
@ -131,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
|
||||
|
||||
fileutil.Fsync(f)
|
||||
|
||||
f.Close()
|
||||
|
||||
if rerr := os.Rename(partpath, path); rerr != nil {
|
||||
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
|
||||
ExitWithError(ExitIO, exiterr)
|
||||
@ -186,8 +191,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
|
||||
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
|
||||
}
|
||||
|
||||
makeDB(snapdir, args[0])
|
||||
makeWAL(waldir, cl)
|
||||
makeDB(snapdir, args[0], len(cl.Members()))
|
||||
makeWALAndSnap(waldir, snapdir, cl)
|
||||
}
|
||||
|
||||
func initialClusterFromName(name string) string {
|
||||
@ -199,11 +204,18 @@ func initialClusterFromName(name string) string {
|
||||
}
|
||||
|
||||
// makeWAL creates a WAL for the initial cluster
|
||||
func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
|
||||
if err := fileutil.CreateDirAll(waldir); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
|
||||
// add members again to persist them to the store we create.
|
||||
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||
cl.SetStore(st)
|
||||
for _, m := range cl.Members() {
|
||||
cl.AddMember(m)
|
||||
}
|
||||
|
||||
m := cl.MemberByName(restoreName)
|
||||
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
|
||||
metadata, merr := md.Marshal()
|
||||
@ -227,7 +239,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
}
|
||||
|
||||
ents := make([]raftpb.Entry, len(peers))
|
||||
nodeIDs := make([]uint64, len(peers))
|
||||
for i, p := range peers {
|
||||
nodeIDs[i] = p.ID
|
||||
cc := raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: p.ID,
|
||||
@ -245,20 +259,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
|
||||
ents[i] = e
|
||||
}
|
||||
|
||||
w.Save(raftpb.HardState{
|
||||
Term: 1,
|
||||
commit, term := uint64(len(ents)), uint64(1)
|
||||
|
||||
if err := w.Save(raftpb.HardState{
|
||||
Term: term,
|
||||
Vote: peers[0].ID,
|
||||
Commit: uint64(len(ents))}, ents)
|
||||
Commit: commit}, ents); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
|
||||
b, berr := st.Save()
|
||||
if berr != nil {
|
||||
ExitWithError(ExitError, berr)
|
||||
}
|
||||
|
||||
raftSnap := raftpb.Snapshot{
|
||||
Data: b,
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: commit,
|
||||
Term: term,
|
||||
ConfState: raftpb.ConfState{
|
||||
Nodes: nodeIDs,
|
||||
},
|
||||
},
|
||||
}
|
||||
snapshotter := snap.New(snapdir)
|
||||
if err := snapshotter.SaveSnap(raftSnap); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
|
||||
ExitWithError(ExitIO, err)
|
||||
}
|
||||
}
|
||||
|
||||
// initIndex implements ConsistentIndexGetter so the snapshot won't block
|
||||
// the new raft instance by waiting for a future raft index.
|
||||
type initIndex struct{}
|
||||
type initIndex int
|
||||
|
||||
func (*initIndex) ConsistentIndex() uint64 { return 1 }
|
||||
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
|
||||
|
||||
// makeDB copies the database snapshot to the snapshot directory
|
||||
func makeDB(snapdir, dbfile string) {
|
||||
func makeDB(snapdir, dbfile string, commit int) {
|
||||
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
|
||||
if ferr != nil {
|
||||
ExitWithError(ExitInvalidInput, ferr)
|
||||
@ -329,7 +371,7 @@ func makeDB(snapdir, dbfile string) {
|
||||
// update consistentIndex so applies go through on etcdserver despite
|
||||
// having a new raft instance
|
||||
be := backend.NewDefaultBackend(dbpath)
|
||||
s := mvcc.NewStore(be, nil, &initIndex{})
|
||||
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
|
||||
id := s.TxnBegin()
|
||||
btx := be.BatchTx()
|
||||
del := func(k, v []byte) error {
|
||||
@ -339,6 +381,7 @@ func makeDB(snapdir, dbfile string) {
|
||||
|
||||
// delete stored members from old cluster since using new members
|
||||
btx.UnsafeForEach([]byte("members"), del)
|
||||
// todo: add back new members when we start to deprecate old snap file.
|
||||
btx.UnsafeForEach([]byte("members_removed"), del)
|
||||
// trigger write-out of new consistent index
|
||||
s.TxnEnd(id)
|
||||
|
@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
if len(line) == 1 {
|
||||
|
||||
// remove space from the line
|
||||
line = strings.TrimSpace(line)
|
||||
if len(line) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// remove trialling \n
|
||||
line = line[:len(line)-1]
|
||||
cmp, err := parseCompare(line)
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
if len(line) == 1 {
|
||||
|
||||
// remove space from the line
|
||||
line = strings.TrimSpace(line)
|
||||
if len(line) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// remove trialling \n
|
||||
line = line[:len(line)-1]
|
||||
op, err := parseRequestUnion(line)
|
||||
if err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
|
@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
|
||||
}
|
||||
|
||||
func argify(s string) []string {
|
||||
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
|
||||
return r.FindAllString(s, -1)
|
||||
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
|
||||
args := r.FindAllString(s, -1)
|
||||
for i := range args {
|
||||
if len(args[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
if args[i][0] == '\'' {
|
||||
// 'single-quoted string'
|
||||
args[i] = args[i][1 : len(args)-1]
|
||||
} else if args[i][0] == '"' {
|
||||
// "double quoted string"
|
||||
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
|
||||
ExitWithError(ExitInvalidInput, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
|
||||
|
@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
|
||||
watchInteractiveFunc(cmd, args)
|
||||
return
|
||||
}
|
||||
if len(args) < 1 || len(args) > 2 {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
|
||||
}
|
||||
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
key := args[0]
|
||||
if len(args) == 2 {
|
||||
if watchPrefix {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(args[1]))
|
||||
}
|
||||
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
c := mustClientFromCmd(cmd)
|
||||
wc := c.Watch(context.TODO(), key, opts...)
|
||||
printWatchCh(wc)
|
||||
err := c.Close()
|
||||
if err == nil {
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
wc, err := getWatchChan(c, args)
|
||||
if err != nil {
|
||||
ExitWithError(ExitBadArgs, err)
|
||||
}
|
||||
ExitWithError(ExitBadConnection, err)
|
||||
|
||||
printWatchCh(wc)
|
||||
if err = c.Close(); err != nil {
|
||||
ExitWithError(ExitBadConnection, err)
|
||||
}
|
||||
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
|
||||
}
|
||||
|
||||
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||
continue
|
||||
}
|
||||
moreargs := flagset.Args()
|
||||
if len(moreargs) < 1 || len(moreargs) > 2 {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
|
||||
ch, err := getWatchChan(c, flagset.Args())
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
|
||||
continue
|
||||
}
|
||||
var key string
|
||||
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
|
||||
if err != nil {
|
||||
key = moreargs[0]
|
||||
}
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
if len(moreargs) == 2 {
|
||||
if watchPrefix {
|
||||
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
|
||||
continue
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(moreargs[1]))
|
||||
}
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
ch := c.Watch(context.TODO(), key, opts...)
|
||||
go printWatchCh(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
|
||||
if len(args) < 1 || len(args) > 2 {
|
||||
return nil, fmt.Errorf("bad number of arguments")
|
||||
}
|
||||
key := args[0]
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
|
||||
if len(args) == 2 {
|
||||
if watchPrefix {
|
||||
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
|
||||
}
|
||||
opts = append(opts, clientv3.WithRange(args[1]))
|
||||
}
|
||||
if watchPrefix {
|
||||
opts = append(opts, clientv3.WithPrefix())
|
||||
}
|
||||
return c.Watch(context.TODO(), key, opts...), nil
|
||||
}
|
||||
|
||||
func printWatchCh(ch clientv3.WatchChan) {
|
||||
for resp := range ch {
|
||||
display.Watch(resp)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
|
||||
}
|
||||
|
||||
func (cfg *config) validateConfig(isSet func(field string) bool) error {
|
||||
if err := checkBindURLs(cfg.lpurls); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkBindURLs(cfg.lcurls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
|
||||
// TODO(yichengq): check this for joining through discovery service case
|
||||
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
|
||||
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
|
||||
|
||||
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||
|
||||
// checkBindURLs returns an error if any URL uses a domain name.
|
||||
// TODO: return error in 3.2.0
|
||||
func checkBindURLs(urls []url.URL) error {
|
||||
for _, url := range urls {
|
||||
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||
continue
|
||||
}
|
||||
host, _, err := net.SplitHostPort(url.Host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if host == "localhost" {
|
||||
// special case for local address
|
||||
// TODO: support /etc/hosts ?
|
||||
continue
|
||||
}
|
||||
if net.ParseIP(host) == nil {
|
||||
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||
plog.Warning(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -412,8 +412,13 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||
if beExist {
|
||||
kvindex := srv.kv.ConsistentIndex()
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
if snapshot != nil && kvindex < snapshot.Metadata.Index {
|
||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
|
||||
if kvindex != 0 {
|
||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
|
||||
}
|
||||
plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
|
||||
}
|
||||
}
|
||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||
|
@ -174,3 +174,28 @@ func TestElectionSessionRecampaign(t *testing.T) {
|
||||
t.Fatalf("expected value=%q, got response %v", "def", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// TestElectionOnPrefixOfExistingKey checks that a single
|
||||
// candidate can be elected on a new key that is a prefix
|
||||
// of an existing key. To wit, check for regression
|
||||
// of bug #6278. https://github.com/coreos/etcd/issues/6278
|
||||
//
|
||||
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
cli := clus.RandClient()
|
||||
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
e := concurrency.NewElection(cli, "test")
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
err := e.Campaign(ctx, "abc")
|
||||
cancel()
|
||||
if err != nil {
|
||||
// after 5 seconds, deadlock results in
|
||||
// 'context deadline exceeded' here.
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
22
pkg/fileutil/dir_unix.go
Normal file
22
pkg/fileutil/dir_unix.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows
|
||||
|
||||
package fileutil
|
||||
|
||||
import "os"
|
||||
|
||||
// OpenDir opens a directory for syncing.
|
||||
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
46
pkg/fileutil/dir_windows.go
Normal file
46
pkg/fileutil/dir_windows.go
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build windows
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// OpenDir opens a directory in windows with write access for syncing.
|
||||
func OpenDir(path string) (*os.File, error) {
|
||||
fd, err := openDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return os.NewFile(uintptr(fd), path), nil
|
||||
}
|
||||
|
||||
func openDir(path string) (fd syscall.Handle, err error) {
|
||||
if len(path) == 0 {
|
||||
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
|
||||
}
|
||||
pathp, err := syscall.UTF16PtrFromString(path)
|
||||
if err != nil {
|
||||
return syscall.InvalidHandle, err
|
||||
}
|
||||
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
|
||||
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
|
||||
createmode := uint32(syscall.OPEN_EXISTING)
|
||||
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
|
||||
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
|
||||
}
|
@ -96,3 +96,26 @@ func Exist(name string) bool {
|
||||
_, err := os.Stat(name)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// ZeroToEnd zeros a file starting from SEEK_CUR to its SEEK_END. May temporarily
|
||||
// shorten the length of the file.
|
||||
func ZeroToEnd(f *os.File) error {
|
||||
// TODO: support FALLOC_FL_ZERO_RANGE
|
||||
off, err := f.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lenf, lerr := f.Seek(0, os.SEEK_END)
|
||||
if lerr != nil {
|
||||
return lerr
|
||||
}
|
||||
if err = f.Truncate(off); err != nil {
|
||||
return err
|
||||
}
|
||||
// make sure blocks remain allocated
|
||||
if err = Preallocate(f, lenf, true); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Seek(off, os.SEEK_SET)
|
||||
return err
|
||||
}
|
||||
|
@ -118,3 +118,42 @@ func TestExist(t *testing.T) {
|
||||
t.Errorf("exist = %v, want false", g)
|
||||
}
|
||||
}
|
||||
|
||||
func TestZeroToEnd(t *testing.T) {
|
||||
f, err := ioutil.TempFile(os.TempDir(), "fileutil")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
b := make([]byte, 1024)
|
||||
for i := range b {
|
||||
b[i] = 12
|
||||
}
|
||||
if _, err = f.Write(b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = f.Seek(512, os.SEEK_SET); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = ZeroToEnd(f); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
off, serr := f.Seek(0, os.SEEK_CUR)
|
||||
if serr != nil {
|
||||
t.Fatal(serr)
|
||||
}
|
||||
if off != 512 {
|
||||
t.Fatalf("expected offset 512, got %d", off)
|
||||
}
|
||||
|
||||
b = make([]byte, 512)
|
||||
if _, err = f.Read(b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := range b {
|
||||
if b[i] != 0 {
|
||||
t.Errorf("expected b[%d] = 0, got %d", i, b[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
103
pkg/ioutil/pagewriter.go
Normal file
103
pkg/ioutil/pagewriter.go
Normal file
@ -0,0 +1,103 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ioutil
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
var defaultBufferBytes = 128 * 1024
|
||||
|
||||
// PageWriter implements the io.Writer interface so that writes will
|
||||
// either be in page chunks or from flushing.
|
||||
type PageWriter struct {
|
||||
w io.Writer
|
||||
// pageOffset tracks the page offset of the base of the buffer
|
||||
pageOffset int
|
||||
// pageBytes is the number of bytes per page
|
||||
pageBytes int
|
||||
// bufferedBytes counts the number of bytes pending for write in the buffer
|
||||
bufferedBytes int
|
||||
// buf holds the write buffer
|
||||
buf []byte
|
||||
// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
|
||||
// to be flushed. It is less than len(buf) so there is space for slack writes
|
||||
// to bring the writer to page alignment.
|
||||
bufWatermarkBytes int
|
||||
}
|
||||
|
||||
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
|
||||
return &PageWriter{
|
||||
w: w,
|
||||
pageBytes: pageBytes,
|
||||
buf: make([]byte, defaultBufferBytes+pageBytes),
|
||||
bufWatermarkBytes: defaultBufferBytes,
|
||||
}
|
||||
}
|
||||
|
||||
func (pw *PageWriter) Write(p []byte) (n int, err error) {
|
||||
if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
|
||||
// no overflow
|
||||
copy(pw.buf[pw.bufferedBytes:], p)
|
||||
pw.bufferedBytes += len(p)
|
||||
return len(p), nil
|
||||
}
|
||||
// complete the slack page in the buffer if unaligned
|
||||
slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
|
||||
if slack != pw.pageBytes {
|
||||
partial := slack > len(p)
|
||||
if partial {
|
||||
// not enough data to complete the slack page
|
||||
slack = len(p)
|
||||
}
|
||||
// special case: writing to slack page in buffer
|
||||
copy(pw.buf[pw.bufferedBytes:], p[:slack])
|
||||
pw.bufferedBytes += slack
|
||||
n = slack
|
||||
p = p[slack:]
|
||||
if partial {
|
||||
// avoid forcing an unaligned flush
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
// buffer contents are now page-aligned; clear out
|
||||
if err = pw.Flush(); err != nil {
|
||||
return n, err
|
||||
}
|
||||
// directly write all complete pages without copying
|
||||
if len(p) > pw.pageBytes {
|
||||
pages := len(p) / pw.pageBytes
|
||||
c, werr := pw.w.Write(p[:pages*pw.pageBytes])
|
||||
n += c
|
||||
if werr != nil {
|
||||
return n, werr
|
||||
}
|
||||
p = p[pages*pw.pageBytes:]
|
||||
}
|
||||
// write remaining tail to buffer
|
||||
c, werr := pw.Write(p)
|
||||
n += c
|
||||
return n, werr
|
||||
}
|
||||
|
||||
func (pw *PageWriter) Flush() error {
|
||||
if pw.bufferedBytes == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
|
||||
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
|
||||
pw.bufferedBytes = 0
|
||||
return err
|
||||
}
|
100
pkg/ioutil/pagewriter_test.go
Normal file
100
pkg/ioutil/pagewriter_test.go
Normal file
@ -0,0 +1,100 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ioutil
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPageWriterRandom(t *testing.T) {
|
||||
// smaller buffer for stress testing
|
||||
defaultBufferBytes = 8 * 1024
|
||||
pageBytes := 128
|
||||
buf := make([]byte, 4*defaultBufferBytes)
|
||||
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
|
||||
w := NewPageWriter(cw, pageBytes)
|
||||
n := 0
|
||||
for i := 0; i < 4096; i++ {
|
||||
c, err := w.Write(buf[:rand.Intn(len(buf))])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
n += c
|
||||
}
|
||||
if cw.writeBytes > n {
|
||||
t.Fatalf("wrote %d bytes to io.Writer, but only wrote %d bytes", cw.writeBytes, n)
|
||||
}
|
||||
if cw.writeBytes-n > pageBytes {
|
||||
t.Fatalf("got %d bytes pending, expected less than %d bytes", cw.writeBytes-n, pageBytes)
|
||||
}
|
||||
t.Logf("total writes: %d", cw.writes)
|
||||
t.Logf("total write bytes: %d (of %d)", cw.writeBytes, n)
|
||||
}
|
||||
|
||||
// TestPageWriterPariallack tests the case where a write overflows the buffer
|
||||
// but there is not enough data to complete the slack write.
|
||||
func TestPageWriterPartialSlack(t *testing.T) {
|
||||
defaultBufferBytes = 1024
|
||||
pageBytes := 128
|
||||
buf := make([]byte, defaultBufferBytes)
|
||||
cw := &checkPageWriter{pageBytes: 64, t: t}
|
||||
w := NewPageWriter(cw, pageBytes)
|
||||
// put writer in non-zero page offset
|
||||
if _, err := w.Write(buf[:64]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := w.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cw.writes != 1 {
|
||||
t.Fatalf("got %d writes, expected 1", cw.writes)
|
||||
}
|
||||
// nearly fill buffer
|
||||
if _, err := w.Write(buf[:1022]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// overflow buffer, but without enough to write as aligned
|
||||
if _, err := w.Write(buf[:8]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cw.writes != 1 {
|
||||
t.Fatalf("got %d writes, expected 1", cw.writes)
|
||||
}
|
||||
// finish writing slack space
|
||||
if _, err := w.Write(buf[:128]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cw.writes != 2 {
|
||||
t.Fatalf("got %d writes, expected 2", cw.writes)
|
||||
}
|
||||
}
|
||||
|
||||
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
|
||||
type checkPageWriter struct {
|
||||
pageBytes int
|
||||
writes int
|
||||
writeBytes int
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (cw *checkPageWriter) Write(p []byte) (int, error) {
|
||||
if len(p)%cw.pageBytes != 0 {
|
||||
cw.t.Fatalf("got write len(p) = %d, expected len(p) == k*cw.pageBytes", len(p))
|
||||
}
|
||||
cw.writes++
|
||||
cw.writeBytes += len(p)
|
||||
return len(p), nil
|
||||
}
|
@ -49,6 +49,7 @@ var (
|
||||
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.3.0"
|
||||
Version = "3.0.6"
|
||||
Version = "3.0.10"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
@ -15,19 +15,24 @@
|
||||
package wal
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/pkg/crc"
|
||||
"github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
)
|
||||
|
||||
// walPageBytes is the alignment for flushing records to the backing Writer.
|
||||
// It should be a multiple of the minimum sector size so that WAL repair can
|
||||
// safely between torn writes and ordinary data corruption.
|
||||
const walPageBytes = 8 * minSectorSize
|
||||
|
||||
type encoder struct {
|
||||
mu sync.Mutex
|
||||
bw *bufio.Writer
|
||||
bw *ioutil.PageWriter
|
||||
|
||||
crc hash.Hash32
|
||||
buf []byte
|
||||
@ -36,7 +41,7 @@ type encoder struct {
|
||||
|
||||
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
|
||||
return &encoder{
|
||||
bw: bufio.NewWriter(w),
|
||||
bw: ioutil.NewPageWriter(w, walPageBytes),
|
||||
crc: crc.New(prevCrc, crcTable),
|
||||
// 1MB buffer
|
||||
buf: make([]byte, 1024*1024),
|
||||
|
69
wal/wal.go
69
wal/wal.go
@ -67,7 +67,11 @@ var (
|
||||
// A just opened WAL is in read mode, and ready for reading records.
|
||||
// The WAL will be ready for appending after reading out all the previous records.
|
||||
type WAL struct {
|
||||
dir string // the living directory of the underlay files
|
||||
dir string // the living directory of the underlay files
|
||||
|
||||
// dirFile is a fd for the wal directory for syncing on Rename
|
||||
dirFile *os.File
|
||||
|
||||
metadata []byte // metadata recorded at the head of each WAL
|
||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||
|
||||
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := f.Seek(0, os.SEEK_END); err != nil {
|
||||
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -119,32 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
encoder: newEncoder(f, 0),
|
||||
}
|
||||
w.locks = append(w.locks, f)
|
||||
if err := w.saveCrc(0); err != nil {
|
||||
if err = w.saveCrc(0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// rename of directory with locked files doesn't work on windows; close
|
||||
// the WAL to release the locks so the directory can be renamed
|
||||
w.Close()
|
||||
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
||||
if w, err = w.renameWal(tmpdirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
|
||||
// directory was renamed; sync parent dir to persist rename
|
||||
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
||||
if perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||
return nil, perr
|
||||
}
|
||||
return newWAL, nil
|
||||
if perr = pdir.Close(); err != nil {
|
||||
return nil, perr
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Open opens the WAL at the given snap.
|
||||
@ -154,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||
// previous records.
|
||||
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||
return openAtIndex(dirpath, snap, true)
|
||||
w, err := openAtIndex(dirpath, snap, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OpenForRead only opens the wal files for read.
|
||||
@ -299,6 +311,18 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
state.Reset()
|
||||
return nil, state, nil, err
|
||||
}
|
||||
// decodeRecord() will return io.EOF if it detects a zero record,
|
||||
// but this zero record may be followed by non-zero records from
|
||||
// a torn write. Overwriting some of these non-zero records, but
|
||||
// not all, will cause CRC errors on WAL open. Since the records
|
||||
// were never fully synced to disk in the first place, it's safe
|
||||
// to zero them out to avoid any CRC errors from new writes.
|
||||
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = nil
|
||||
@ -317,7 +341,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
|
||||
if w.tail() != nil {
|
||||
// create encoder (chain crc with the decoder), enable appending
|
||||
_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
|
||||
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
||||
}
|
||||
w.decoder = nil
|
||||
@ -375,6 +398,10 @@ func (w *WAL) cut() error {
|
||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newTail.Close()
|
||||
|
||||
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||
@ -477,7 +504,7 @@ func (w *WAL) Close() error {
|
||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return w.dirFile.Close()
|
||||
}
|
||||
|
||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||
|
@ -636,3 +636,89 @@ func TestRestartCreateWal(t *testing.T) {
|
||||
t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
|
||||
}
|
||||
}
|
||||
|
||||
// TestOpenOnTornWrite ensures that entries past the torn write are truncated.
|
||||
func TestOpenOnTornWrite(t *testing.T) {
|
||||
maxEntries := 40
|
||||
clobberIdx := 20
|
||||
overwriteEntries := 5
|
||||
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
w, err := Create(p, nil)
|
||||
defer w.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// get offset of end of each saved entry
|
||||
offsets := make([]int64, maxEntries)
|
||||
for i := range offsets {
|
||||
es := []raftpb.Entry{{Index: uint64(i)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offsets[i], err = w.tail().Seek(0, os.SEEK_CUR); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
fn := path.Join(p, path.Base(w.tail().Name()))
|
||||
w.Close()
|
||||
|
||||
// clobber some entry with 0's to simulate a torn write
|
||||
f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
|
||||
if ferr != nil {
|
||||
t.Fatal(ferr)
|
||||
}
|
||||
defer f.Close()
|
||||
_, err = f.Seek(offsets[clobberIdx], os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
|
||||
_, err = f.Write(zeros)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
w, err = Open(p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// seek up to clobbered entry
|
||||
_, _, _, err = w.ReadAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// write a few entries past the clobbered entry
|
||||
for i := 0; i < overwriteEntries; i++ {
|
||||
// Index is different from old, truncated entries
|
||||
es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
w.Close()
|
||||
|
||||
// read back the entries, confirm number of entries matches expectation
|
||||
w, err = OpenForRead(p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, _, ents, rerr := w.ReadAll()
|
||||
if rerr != nil {
|
||||
// CRC error? the old entries were likely never truncated away
|
||||
t.Fatal(rerr)
|
||||
}
|
||||
wEntries := (clobberIdx - 1) + overwriteEntries
|
||||
if len(ents) != wEntries {
|
||||
t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
|
||||
}
|
||||
}
|
||||
|
44
wal/wal_unix.go
Normal file
44
wal/wal_unix.go
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
|
||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||
// the lock and trying to reacquire it quickly can be flaky because
|
||||
// it's possible the process will fork to spawn a process while this is
|
||||
// happening. The fds are set up as close-on-exec by the Go runtime,
|
||||
// but there is a window between the fork and the exec where another
|
||||
// process holds the lock.
|
||||
|
||||
if err := os.RemoveAll(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
||||
df, err := fileutil.OpenDir(w.dir)
|
||||
w.dirFile = df
|
||||
return w, err
|
||||
}
|
41
wal/wal_windows.go
Normal file
41
wal/wal_windows.go
Normal file
@ -0,0 +1,41 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
)
|
||||
|
||||
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
||||
// rename of directory with locked files doesn't work on
|
||||
// windows; close the WAL to release the locks so the directory
|
||||
// can be renamed
|
||||
w.Close()
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(w.dir, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
}
|
||||
return newWAL, nil
|
||||
}
|
Reference in New Issue
Block a user