Compare commits

...

37 Commits

Author SHA1 Message Date
546c0f7ed6 version: bump to v3.0.10 2016-09-23 10:49:03 -07:00
adbad1c9b5 ctlv3: close snapshot file before rename (Windows) 2016-09-23 09:11:02 -07:00
273b986751 clientv3: process closed watcherStreams in watcherGrpcStream run loop
Was racing with Watch() when closing the grpc stream on no watchers.

Fixes #6476
2016-09-21 15:52:20 -07:00
5b205729b9 rafthttp: add v3.0.0 to supported streams 2016-09-16 21:54:55 +09:00
fe900b09dd version: bump to v3.0.9+git 2016-09-15 15:10:23 -07:00
494c012659 version: bump to v3.0.9 2016-09-15 12:56:33 -07:00
4abc381ebe clientv3: drain buffered WatchResponses before resuming
Otherwise, the watcherStream can receive WatchResponses in the
middle of a resume, corrupting the stream.

Fixes #6364
2016-09-15 12:38:15 -07:00
73c8fdac53 integration: fix compilation for backported Election test 2016-09-15 11:45:37 -07:00
ee2717493a ctlv3: fix line parsing for Windows 2016-09-15 11:25:53 -07:00
2435eb9ecd clientv3: balancer panics when call up after close
Fix the issue by adding a simple guard varable.
2016-09-15 18:46:26 +09:00
8fb533dabe embed: warn on domain name in listener 2016-09-15 18:46:19 +09:00
2f0f5ac504 Revert "Merge pull request #6365 from heyitsanthony/fix-dns-bind"
This reverts commit af5ab7b351, reversing
changes made to da6a0f0594.
2016-09-15 18:43:46 +09:00
9ab811d478 auth: fix range handling bugs.
Test 15, counting from zero, in TestGetMergedPerms
in etcd/auth/range_perm_cache_test.go, was trying
incorrectly assert that [a, b) merged with [b, "")
should be [a, b). Added a test specifically for
this. This patch fixes the incorrect larger test
and the bugs in the code that it was hiding.

Fixes #6359
2016-09-15 18:41:56 +09:00
e0a99fb4ba version: bump to v3.0.8+git 2016-09-09 15:56:31 -07:00
d40982fc91 version: bump to v3.0.8 2016-09-09 13:14:44 -07:00
fe3a1cc31b wal: fix error type 2016-09-09 09:11:25 +09:00
70713706a1 wal: fix err shadowing (go vet) 2016-09-09 09:07:48 +09:00
0054e7e89b etcdctl: restore should create a snapshot
Restore should create a snasphot. So the new db file
can be sent to newly joined member.
2016-09-09 09:03:51 +09:00
97f718b504 fileutil: windows OpenDir
Windows needs to open a directory with write access to fsync but the go
runtime won't open directories that way.
2016-09-09 09:01:56 +09:00
202da9270e wal: fsync directory after wal file rename
Fixes #6368
2016-09-09 09:01:49 +09:00
6e83ec0ed7 etcdmain: reject binding listeners to domain names
Fixes #6336
2016-09-07 08:08:35 +09:00
5c44cdfdaa etcdctl/ctlv3: don't crash when we should prompt for pw.
when 'etcdctl --user name get blah' is invoked to
 prompt for password, don't panic.

 addresses the segfault part of #6343
2016-09-04 09:02:50 +09:00
09a239f040 e2e: add quoted key/value to txn test 2016-09-04 09:02:47 +09:00
3faff8b2e2 etcdctl: fix quoted string handling in txn and watch
Fixes #6315
2016-09-04 09:02:28 +09:00
2345fda18e version: bump to v3.0.7+git 2016-08-31 16:41:06 -07:00
5695120efc version: bump to v3.0.7 2016-08-31 09:49:24 -07:00
183293e061 wal: lowercase segmentSizeBytes 2016-08-31 09:48:30 -07:00
4b48876f0e clientv3/concurrency: allow election on prefixes of keys.
After winning an election or obtaining a lock, we
auto-append a slash after the provided key prefix.
This avoids the previous deadlock due to waiting
on the wrong key.

Fixes #6278

Conflicts:
	clientv3/concurrency/election.go
	clientv3/concurrency/mutex.go
2016-08-31 09:46:05 -07:00
5089bf58fb wal: hold file lock while renaming WAL directory on non-Windows
Windows requires this lock to be released before the directory is
renamed. But on unix-like operating systems, releasing the lock and
trying to reacquire it immediately can be flaky if a process is forked
around the same time. The file descriptors are marked as close-on-exec
by the Go runtime, but there is a window between the fork and exec where
another process will be holding the lock.
2016-08-31 09:39:57 -07:00
480a347179 wal: use page buffered writer for writing records
Forces torn writes to only happen on sector boundaries.

Fixes #6271
2016-08-30 21:06:36 -07:00
59e560c7a7 ioutil: add page buffered writer
A buffered writer that only writes full pages or when explicitly flushed.
2016-08-30 21:06:33 -07:00
0bd9bea2e9 etcdserver: allow zero kv index for cluster upgrade
If a user upgrades etcd from 2.3.x to 3.0 and shutdown the
cluster immediately without triggering any new backend writes,
then the consistent index in backend would be zero.

The user cannot restart etcdserver due to today's strick index
match checking. We now have to lose this a bit for this case.
2016-08-30 21:05:20 -07:00
bd7581ac59 wal: zero out wal tail past its first zero record
Whenever the WAL is opened for writes, it should write zeroes to its tail
starting from the first zero record. Otherwise, if there are entries past
the first zero record due to a torn write, any new writes that overlap the
old entries will lead to a garbage record on the tail and cause a CRC
mismatch.
2016-08-26 14:27:53 -07:00
db378c3d26 wal: test for truncation on torn writes 2016-08-26 14:27:51 -07:00
23740162dc fileutil: add ZeroToEnd for zeroing files 2016-08-26 14:27:49 -07:00
96422a955f discovery: reject IP address records in SRVGetCluster
Was incorrectly trimming the trailing '.' from the target; this in turn
caused the etcd server to accept any SRV record with an IP target
instead of only targets with A records.
2016-08-24 09:14:47 -07:00
6fd996fdac version: bump to v3.0.6+git 2016-08-19 12:38:13 -07:00
30 changed files with 884 additions and 175 deletions

View File

@ -22,7 +22,10 @@ import (
"github.com/coreos/etcd/mvcc/backend"
)
// isSubset returns true if a is a subset of b
// isSubset returns true if a is a subset of b.
// If a is a prefix of b, then a is a subset of b.
// Given intervals [a1,a2) and [b1,b2), is
// the a interval a subset of b?
func isSubset(a, b *rangePerm) bool {
switch {
case len(a.end) == 0 && len(b.end) == 0:
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
// b is a key, a is a range
return false
case len(a.end) == 0:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
// a is a key, b is a range. need b1 <= a1 and a1 < b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
default:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
// both are ranges. need b1 <= a1 and a2 <= b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
}
}
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
i := 0
for i < len(perms) {
begin, next := i, i
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
next++
}
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
if next != begin && len(perms[next].end) > 0 {
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
} else {
merged = append(merged, perms[begin])
if next != begin {
merged = append(merged, perms[next])
}
}
i = next + 1
}

View File

@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
[]*rangePerm{{[]byte("a"), []byte("b")}},
[]*rangePerm{{[]byte("a"), []byte("b")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
[]*rangePerm{{[]byte("a"), []byte("c")}},
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
},
{
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
},
// duplicate ranges
{

View File

@ -45,6 +45,8 @@ type simpleBalancer struct {
// pinAddr is the currently pinned address; set to the empty string on
// intialization and shutdown.
pinAddr string
closed bool
}
func newSimpleBalancer(eps []string) *simpleBalancer {
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
// gRPC might call Up after it called Close. We add this check
// to "fix" it up at application layer. Or our simplerBalancer
// might panic since b.upc is closed.
if b.closed {
return func(err error) {}
}
if len(b.upEps) == 0 {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.pinAddr = addr.Addr
}
b.upEps[addr.Addr] = struct{}{}
b.mu.Unlock()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
delete(b.upEps, addr.Addr)
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
// In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice.
if b.closed {
return nil
}
b.closed = true
close(b.notifyCh)
// terminate all waiting Get()s
b.pinAddr = ""
if len(b.upEps) == 0 {
close(b.upc)
}
b.mu.Unlock()
return nil
}

View File

@ -40,7 +40,7 @@ type Election struct {
// NewElection returns a new election on a given key prefix.
func NewElection(client *v3.Client, pfx string) *Election {
return &Election{client: client, keyPrefix: pfx}
return &Election{client: client, keyPrefix: pfx + "/"}
}
// Campaign puts a value as eligible for the election. It blocks until
@ -59,7 +59,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]

View File

@ -32,7 +32,7 @@ type Mutex struct {
}
func NewMutex(client *v3.Client, pfx string) *Mutex {
return &Mutex{client, pfx, "", -1}
return &Mutex{client, pfx + "/", "", -1}
}
// Lock locks the mutex with a cancellable context. If the context is cancelled
@ -43,7 +43,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
return serr
}
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))

View File

@ -127,6 +127,8 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
closeErr error
@ -189,11 +191,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
streams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
}
go wgs.run()
return wgs
@ -242,7 +245,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@ -352,15 +354,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}
// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
}
// run is the root of the goroutines for managing a watcher client
@ -464,6 +470,10 @@ func (w *watchGrpcStream) run() {
cancelSet = make(map[int64]struct{})
case <-stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
return
}
}
// send failed; queue for retry
@ -522,6 +532,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
}
}()
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
@ -602,20 +621,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
@ -669,6 +677,10 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RUnlock()
for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
ws.drain()
// pause serveStream
ws.resumec <- -1
@ -701,6 +713,17 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
return nil
}
// drain removes all buffered WatchResponses from the stream's receive channel.
func (ws *watcherStream) drain() {
for {
select {
case <-ws.recvc:
default:
return
}
}
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{

View File

@ -53,8 +53,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
return err
}
for _, srv := range addrs {
target := strings.TrimSuffix(srv.Target, ".")
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
port := fmt.Sprintf("%d", srv.Port)
host := net.JoinHostPort(srv.Target, port)
tcpAddr, err := resolveTCPAddr("tcp", host)
if err != nil {
plog.Warningf("couldn't resolve host %s during SRV discovery", host)
@ -70,8 +70,11 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
n = fmt.Sprintf("%d", tempName)
tempName += 1
}
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, host)
// SRV records have a trailing dot but URL shouldn't.
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, prefix, urlHost)
}
return nil
}

View File

@ -17,6 +17,7 @@ package discovery
import (
"errors"
"net"
"strings"
"testing"
"github.com/coreos/etcd/pkg/testutil"
@ -29,11 +30,22 @@ func TestSRVGetCluster(t *testing.T) {
}()
name := "dnsClusterTest"
dns := map[string]string{
"1.example.com.:2480": "10.0.0.1:2480",
"2.example.com.:2480": "10.0.0.2:2480",
"3.example.com.:2480": "10.0.0.3:2480",
"4.example.com.:2380": "10.0.0.3:2380",
}
srvAll := []*net.SRV{
{Target: "1.example.com.", Port: 2480},
{Target: "2.example.com.", Port: 2480},
{Target: "3.example.com.", Port: 2480},
}
tests := []struct {
withSSL []*net.SRV
withoutSSL []*net.SRV
urls []string
dns map[string]string
expected string
}{
@ -41,61 +53,50 @@ func TestSRVGetCluster(t *testing.T) {
[]*net.SRV{},
[]*net.SRV{},
nil,
nil,
"",
},
{
[]*net.SRV{
{Target: "10.0.0.1", Port: 2480},
{Target: "10.0.0.2", Port: 2480},
{Target: "10.0.0.3", Port: 2480},
},
srvAll,
[]*net.SRV{},
nil,
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480",
},
{
srvAll,
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
"0=https://1.example.com:2480,1=https://2.example.com:2480,2=https://3.example.com:2480,3=http://4.example.com:2380",
},
{
[]*net.SRV{
{Target: "10.0.0.1", Port: 2480},
{Target: "10.0.0.2", Port: 2480},
{Target: "10.0.0.3", Port: 2480},
},
[]*net.SRV{
{Target: "10.0.0.1", Port: 2380},
},
nil,
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:2380",
},
{
[]*net.SRV{
{Target: "10.0.0.1", Port: 2480},
{Target: "10.0.0.2", Port: 2480},
{Target: "10.0.0.3", Port: 2480},
},
[]*net.SRV{
{Target: "10.0.0.1", Port: 2380},
},
srvAll,
[]*net.SRV{{Target: "4.example.com.", Port: 2380}},
[]string{"https://10.0.0.1:2480"},
nil,
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:2380",
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480,2=http://4.example.com:2380",
},
// matching local member with resolved addr and return unresolved hostnames
{
[]*net.SRV{
{Target: "1.example.com.", Port: 2480},
{Target: "2.example.com.", Port: 2480},
{Target: "3.example.com.", Port: 2480},
},
srvAll,
nil,
[]string{"https://10.0.0.1:2480"},
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
},
// invalid
}
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
if strings.Contains(addr, "10.0.0.") {
// accept IP addresses when resolving apurls
return net.ResolveTCPAddr(network, addr)
}
if dns[addr] == "" {
return nil, errors.New("missing dns record")
}
return net.ResolveTCPAddr(network, dns[addr])
}
for i, tt := range tests {
@ -108,12 +109,6 @@ func TestSRVGetCluster(t *testing.T) {
}
return "", nil, errors.New("Unknown service in mock")
}
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
if tt.dns == nil || tt.dns[addr] == "" {
return net.ResolveTCPAddr(network, addr)
}
return net.ResolveTCPAddr(network, tt.dns[addr])
}
urls := testutil.MustNewURLs(t, tt.urls)
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
if err != nil {

View File

@ -39,15 +39,23 @@ func txnTestSuccess(cx ctlCtx) {
if err := ctlV3Put(cx, "key2", "value2", ""); err != nil {
cx.t.Fatalf("txnTestSuccess ctlV3Put error (%v)", err)
}
rqs := txnRequests{
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
ifSucess: []string{"get key1", "get key2"},
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
rqs := []txnRequests{
{
compare: []string{`version("key1") = "1"`, `version("key2") = "1"`},
ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`},
ifFail: []string{`put key1 "fail"`, `put key2 "fail"`},
results: []string{"SUCCESS", "key1", "value1", "key2", "value2"},
},
{
compare: []string{`version("key \"with\" space") = "1"`},
ifSucess: []string{`get "key \"with\" space"`},
results: []string{"SUCCESS", `key "with" space`, "value \x23"},
},
}
if err := ctlV3Txn(cx, rqs); err != nil {
cx.t.Fatal(err)
for _, rq := range rqs {
if err := ctlV3Txn(cx, rq); err != nil {
cx.t.Fatal(err)
}
}
}

View File

@ -243,7 +243,7 @@ func authCfgFromCmd(cmd *cobra.Command) *authCfg {
var cfg authCfg
splitted := strings.SplitN(userFlag, ":", 2)
if len(splitted) == 0 {
if len(splitted) < 2 {
cfg.username = userFlag
cfg.password, err = speakeasy.Ask("Password: ")
if err != nil {

View File

@ -36,7 +36,10 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/spf13/cobra"
"golang.org/x/net/context"
)
@ -112,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
partpath := path + ".part"
f, err := os.Create(partpath)
defer f.Close()
if err != nil {
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
ExitWithError(ExitBadArgs, exiterr)
@ -131,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
fileutil.Fsync(f)
f.Close()
if rerr := os.Rename(partpath, path); rerr != nil {
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
ExitWithError(ExitIO, exiterr)
@ -186,8 +191,8 @@ func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
}
makeDB(snapdir, args[0])
makeWAL(waldir, cl)
makeDB(snapdir, args[0], len(cl.Members()))
makeWALAndSnap(waldir, snapdir, cl)
}
func initialClusterFromName(name string) string {
@ -199,11 +204,18 @@ func initialClusterFromName(name string) string {
}
// makeWAL creates a WAL for the initial cluster
func makeWAL(waldir string, cl *membership.RaftCluster) {
func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
if err := fileutil.CreateDirAll(waldir); err != nil {
ExitWithError(ExitIO, err)
}
// add members again to persist them to the store we create.
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
cl.SetStore(st)
for _, m := range cl.Members() {
cl.AddMember(m)
}
m := cl.MemberByName(restoreName)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
metadata, merr := md.Marshal()
@ -227,7 +239,9 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
@ -245,20 +259,48 @@ func makeWAL(waldir string, cl *membership.RaftCluster) {
ents[i] = e
}
w.Save(raftpb.HardState{
Term: 1,
commit, term := uint64(len(ents)), uint64(1)
if err := w.Save(raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: uint64(len(ents))}, ents)
Commit: commit}, ents); err != nil {
ExitWithError(ExitIO, err)
}
b, berr := st.Save()
if berr != nil {
ExitWithError(ExitError, berr)
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: raftpb.ConfState{
Nodes: nodeIDs,
},
},
}
snapshotter := snap.New(snapdir)
if err := snapshotter.SaveSnap(raftSnap); err != nil {
panic(err)
}
if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
ExitWithError(ExitIO, err)
}
}
// initIndex implements ConsistentIndexGetter so the snapshot won't block
// the new raft instance by waiting for a future raft index.
type initIndex struct{}
type initIndex int
func (*initIndex) ConsistentIndex() uint64 { return 1 }
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
// makeDB copies the database snapshot to the snapshot directory
func makeDB(snapdir, dbfile string) {
func makeDB(snapdir, dbfile string, commit int) {
f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
if ferr != nil {
ExitWithError(ExitInvalidInput, ferr)
@ -329,7 +371,7 @@ func makeDB(snapdir, dbfile string) {
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
s := mvcc.NewStore(be, nil, &initIndex{})
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {
@ -339,6 +381,7 @@ func makeDB(snapdir, dbfile string) {
// delete stored members from old cluster since using new members
btx.UnsafeForEach([]byte("members"), del)
// todo: add back new members when we start to deprecate old snap file.
btx.UnsafeForEach([]byte("members_removed"), del)
// trigger write-out of new consistent index
s.TxnEnd(id)

View File

@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
cmp, err := parseCompare(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
op, err := parseRequestUnion(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)

View File

@ -46,8 +46,23 @@ func addHexPrefix(s string) string {
}
func argify(s string) []string {
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
return r.FindAllString(s, -1)
r := regexp.MustCompile(`"(?:[^"\\]|\\.)*"|'[^']*'|[^'"\s]\S*[^'"\s]?`)
args := r.FindAllString(s, -1)
for i := range args {
if len(args[i]) == 0 {
continue
}
if args[i][0] == '\'' {
// 'single-quoted string'
args[i] = args[i][1 : len(args)-1]
} else if args[i][0] == '"' {
// "double quoted string"
if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil {
ExitWithError(ExitInvalidInput, err)
}
}
}
return args
}
func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {

View File

@ -52,30 +52,18 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
watchInteractiveFunc(cmd, args)
return
}
if len(args) < 1 || len(args) > 2 {
ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires one or two arguments as key or prefix, with range end"))
}
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
key := args[0]
if len(args) == 2 {
if watchPrefix {
ExitWithError(ExitBadArgs, fmt.Errorf("`range_end` and `--prefix` cannot be set at the same time, choose one"))
}
opts = append(opts, clientv3.WithRange(args[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
c := mustClientFromCmd(cmd)
wc := c.Watch(context.TODO(), key, opts...)
printWatchCh(wc)
err := c.Close()
if err == nil {
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
wc, err := getWatchChan(c, args)
if err != nil {
ExitWithError(ExitBadArgs, err)
}
ExitWithError(ExitBadConnection, err)
printWatchCh(wc)
if err = c.Close(); err != nil {
ExitWithError(ExitBadConnection, err)
}
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
}
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
@ -107,32 +95,33 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
moreargs := flagset.Args()
if len(moreargs) < 1 || len(moreargs) > 2 {
fmt.Fprintf(os.Stderr, "Invalid command %s (Too few or many arguments)\n", l)
ch, err := getWatchChan(c, flagset.Args())
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
var key string
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
if err != nil {
key = moreargs[0]
}
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if len(moreargs) == 2 {
if watchPrefix {
fmt.Fprintf(os.Stderr, "`range_end` and `--prefix` cannot be set at the same time, choose one\n")
continue
}
opts = append(opts, clientv3.WithRange(moreargs[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
ch := c.Watch(context.TODO(), key, opts...)
go printWatchCh(ch)
}
}
func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
if len(args) < 1 || len(args) > 2 {
return nil, fmt.Errorf("bad number of arguments")
}
key := args[0]
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if len(args) == 2 {
if watchPrefix {
return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
}
opts = append(opts, clientv3.WithRange(args[1]))
}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
return c.Watch(context.TODO(), key, opts...), nil
}
func printWatchCh(ch clientv3.WatchChan) {
for resp := range ch {
display.Watch(resp)

View File

@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"runtime"
@ -410,6 +411,13 @@ func (cfg *config) configFromFile() error {
}
func (cfg *config) validateConfig(isSet func(field string) bool) error {
if err := checkBindURLs(cfg.lpurls); err != nil {
return err
}
if err := checkBindURLs(cfg.lcurls); err != nil {
return err
}
// when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set.
// TODO(yichengq): check this for joining through discovery service case
mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy
@ -456,3 +464,27 @@ func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == pr
func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// checkBindURLs returns an error if any URL uses a domain name.
// TODO: return error in 3.2.0
func checkBindURLs(urls []url.URL) error {
for _, url := range urls {
if url.Scheme == "unix" || url.Scheme == "unixs" {
continue
}
host, _, err := net.SplitHostPort(url.Host)
if err != nil {
return err
}
if host == "localhost" {
// special case for local address
// TODO: support /etc/hosts ?
continue
}
if net.ParseIP(host) == nil {
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
plog.Warning(err)
}
}
return nil
}

View File

@ -412,8 +412,13 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
if snapshot != nil && kvindex < snapshot.Metadata.Index {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
if kvindex != 0 {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
}
plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
}
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())

View File

@ -174,3 +174,28 @@ func TestElectionSessionRecampaign(t *testing.T) {
t.Fatalf("expected value=%q, got response %v", "def", resp)
}
}
// TestElectionOnPrefixOfExistingKey checks that a single
// candidate can be elected on a new key that is a prefix
// of an existing key. To wit, check for regression
// of bug #6278. https://github.com/coreos/etcd/issues/6278
//
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
t.Fatal(err)
}
e := concurrency.NewElection(cli, "test")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := e.Campaign(ctx, "abc")
cancel()
if err != nil {
// after 5 seconds, deadlock results in
// 'context deadline exceeded' here.
t.Fatal(err)
}
}

22
pkg/fileutil/dir_unix.go Normal file
View File

@ -0,0 +1,22 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package fileutil
import "os"
// OpenDir opens a directory for syncing.
func OpenDir(path string) (*os.File, error) { return os.Open(path) }

View File

@ -0,0 +1,46 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package fileutil
import (
"os"
"syscall"
)
// OpenDir opens a directory in windows with write access for syncing.
func OpenDir(path string) (*os.File, error) {
fd, err := openDir(path)
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), path), nil
}
func openDir(path string) (fd syscall.Handle, err error) {
if len(path) == 0 {
return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND
}
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return syscall.InvalidHandle, err
}
access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE)
sharemode := uint32(syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE)
createmode := uint32(syscall.OPEN_EXISTING)
fl := uint32(syscall.FILE_FLAG_BACKUP_SEMANTICS)
return syscall.CreateFile(pathp, access, sharemode, nil, createmode, fl, 0)
}

View File

@ -96,3 +96,26 @@ func Exist(name string) bool {
_, err := os.Stat(name)
return err == nil
}
// ZeroToEnd zeros a file starting from SEEK_CUR to its SEEK_END. May temporarily
// shorten the length of the file.
func ZeroToEnd(f *os.File) error {
// TODO: support FALLOC_FL_ZERO_RANGE
off, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
lenf, lerr := f.Seek(0, os.SEEK_END)
if lerr != nil {
return lerr
}
if err = f.Truncate(off); err != nil {
return err
}
// make sure blocks remain allocated
if err = Preallocate(f, lenf, true); err != nil {
return err
}
_, err = f.Seek(off, os.SEEK_SET)
return err
}

View File

@ -118,3 +118,42 @@ func TestExist(t *testing.T) {
t.Errorf("exist = %v, want false", g)
}
}
func TestZeroToEnd(t *testing.T) {
f, err := ioutil.TempFile(os.TempDir(), "fileutil")
if err != nil {
t.Fatal(err)
}
defer f.Close()
b := make([]byte, 1024)
for i := range b {
b[i] = 12
}
if _, err = f.Write(b); err != nil {
t.Fatal(err)
}
if _, err = f.Seek(512, os.SEEK_SET); err != nil {
t.Fatal(err)
}
if err = ZeroToEnd(f); err != nil {
t.Fatal(err)
}
off, serr := f.Seek(0, os.SEEK_CUR)
if serr != nil {
t.Fatal(serr)
}
if off != 512 {
t.Fatalf("expected offset 512, got %d", off)
}
b = make([]byte, 512)
if _, err = f.Read(b); err != nil {
t.Fatal(err)
}
for i := range b {
if b[i] != 0 {
t.Errorf("expected b[%d] = 0, got %d", i, b[i])
}
}
}

103
pkg/ioutil/pagewriter.go Normal file
View File

@ -0,0 +1,103 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ioutil
import (
"io"
)
var defaultBufferBytes = 128 * 1024
// PageWriter implements the io.Writer interface so that writes will
// either be in page chunks or from flushing.
type PageWriter struct {
w io.Writer
// pageOffset tracks the page offset of the base of the buffer
pageOffset int
// pageBytes is the number of bytes per page
pageBytes int
// bufferedBytes counts the number of bytes pending for write in the buffer
bufferedBytes int
// buf holds the write buffer
buf []byte
// bufWatermarkBytes is the number of bytes the buffer can hold before it needs
// to be flushed. It is less than len(buf) so there is space for slack writes
// to bring the writer to page alignment.
bufWatermarkBytes int
}
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
return &PageWriter{
w: w,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,
}
}
func (pw *PageWriter) Write(p []byte) (n int, err error) {
if len(p)+pw.bufferedBytes <= pw.bufWatermarkBytes {
// no overflow
copy(pw.buf[pw.bufferedBytes:], p)
pw.bufferedBytes += len(p)
return len(p), nil
}
// complete the slack page in the buffer if unaligned
slack := pw.pageBytes - ((pw.pageOffset + pw.bufferedBytes) % pw.pageBytes)
if slack != pw.pageBytes {
partial := slack > len(p)
if partial {
// not enough data to complete the slack page
slack = len(p)
}
// special case: writing to slack page in buffer
copy(pw.buf[pw.bufferedBytes:], p[:slack])
pw.bufferedBytes += slack
n = slack
p = p[slack:]
if partial {
// avoid forcing an unaligned flush
return n, nil
}
}
// buffer contents are now page-aligned; clear out
if err = pw.Flush(); err != nil {
return n, err
}
// directly write all complete pages without copying
if len(p) > pw.pageBytes {
pages := len(p) / pw.pageBytes
c, werr := pw.w.Write(p[:pages*pw.pageBytes])
n += c
if werr != nil {
return n, werr
}
p = p[pages*pw.pageBytes:]
}
// write remaining tail to buffer
c, werr := pw.Write(p)
n += c
return n, werr
}
func (pw *PageWriter) Flush() error {
if pw.bufferedBytes == 0 {
return nil
}
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
pw.bufferedBytes = 0
return err
}

View File

@ -0,0 +1,100 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ioutil
import (
"math/rand"
"testing"
)
func TestPageWriterRandom(t *testing.T) {
// smaller buffer for stress testing
defaultBufferBytes = 8 * 1024
pageBytes := 128
buf := make([]byte, 4*defaultBufferBytes)
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
w := NewPageWriter(cw, pageBytes)
n := 0
for i := 0; i < 4096; i++ {
c, err := w.Write(buf[:rand.Intn(len(buf))])
if err != nil {
t.Fatal(err)
}
n += c
}
if cw.writeBytes > n {
t.Fatalf("wrote %d bytes to io.Writer, but only wrote %d bytes", cw.writeBytes, n)
}
if cw.writeBytes-n > pageBytes {
t.Fatalf("got %d bytes pending, expected less than %d bytes", cw.writeBytes-n, pageBytes)
}
t.Logf("total writes: %d", cw.writes)
t.Logf("total write bytes: %d (of %d)", cw.writeBytes, n)
}
// TestPageWriterPariallack tests the case where a write overflows the buffer
// but there is not enough data to complete the slack write.
func TestPageWriterPartialSlack(t *testing.T) {
defaultBufferBytes = 1024
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes)
// put writer in non-zero page offset
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if cw.writes != 1 {
t.Fatalf("got %d writes, expected 1", cw.writes)
}
// nearly fill buffer
if _, err := w.Write(buf[:1022]); err != nil {
t.Fatal(err)
}
// overflow buffer, but without enough to write as aligned
if _, err := w.Write(buf[:8]); err != nil {
t.Fatal(err)
}
if cw.writes != 1 {
t.Fatalf("got %d writes, expected 1", cw.writes)
}
// finish writing slack space
if _, err := w.Write(buf[:128]); err != nil {
t.Fatal(err)
}
if cw.writes != 2 {
t.Fatalf("got %d writes, expected 2", cw.writes)
}
}
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
type checkPageWriter struct {
pageBytes int
writes int
writeBytes int
t *testing.T
}
func (cw *checkPageWriter) Write(p []byte) (int, error) {
if len(p)%cw.pageBytes != 0 {
cw.t.Fatalf("got write len(p) = %d, expected len(p) == k*cw.pageBytes", len(p))
}
cw.writes++
cw.writeBytes += len(p)
return len(p), nil
}

View File

@ -49,6 +49,7 @@ var (
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.6"
Version = "3.0.10"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -15,19 +15,24 @@
package wal
import (
"bufio"
"encoding/binary"
"hash"
"io"
"sync"
"github.com/coreos/etcd/pkg/crc"
"github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/wal/walpb"
)
// walPageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that WAL repair can
// safely between torn writes and ordinary data corruption.
const walPageBytes = 8 * minSectorSize
type encoder struct {
mu sync.Mutex
bw *bufio.Writer
bw *ioutil.PageWriter
crc hash.Hash32
buf []byte
@ -36,7 +41,7 @@ type encoder struct {
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
return &encoder{
bw: bufio.NewWriter(w),
bw: ioutil.NewPageWriter(w, walPageBytes),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),

View File

@ -67,7 +67,11 @@ var (
// A just opened WAL is in read mode, and ready for reading records.
// The WAL will be ready for appending after reading out all the previous records.
type WAL struct {
dir string // the living directory of the underlay files
dir string // the living directory of the underlay files
// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
@ -106,10 +110,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
if err != nil {
return nil, err
}
if _, err := f.Seek(0, os.SEEK_END); err != nil {
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
if err := fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
if err = fileutil.Preallocate(f.File, segmentSizeBytes, true); err != nil {
return nil, err
}
@ -119,32 +123,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
encoder: newEncoder(f, 0),
}
w.locks = append(w.locks, f)
if err := w.saveCrc(0); err != nil {
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}
// rename of directory with locked files doesn't work on windows; close
// the WAL to release the locks so the directory can be renamed
w.Close()
if err := os.Rename(tmpdirpath, dirpath); err != nil {
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}
// reopen and relock
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
if oerr != nil {
return nil, oerr
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
if perr != nil {
return nil, perr
}
if _, _, _, err := newWAL.ReadAll(); err != nil {
newWAL.Close()
return nil, err
if perr = fileutil.Fsync(pdir); perr != nil {
return nil, perr
}
return newWAL, nil
if perr = pdir.Close(); err != nil {
return nil, perr
}
return w, nil
}
// Open opens the WAL at the given snap.
@ -154,7 +159,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
// the given snap. The WAL cannot be appended to before reading out all of its
// previous records.
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
w, err := openAtIndex(dirpath, snap, true)
if err != nil {
return nil, err
}
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
return nil, err
}
return w, nil
}
// OpenForRead only opens the wal files for read.
@ -299,6 +311,18 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
state.Reset()
return nil, state, nil, err
}
// decodeRecord() will return io.EOF if it detects a zero record,
// but this zero record may be followed by non-zero records from
// a torn write. Overwriting some of these non-zero records, but
// not all, will cause CRC errors on WAL open. Since the records
// were never fully synced to disk in the first place, it's safe
// to zero them out to avoid any CRC errors from new writes.
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
return nil, state, nil, err
}
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
return nil, state, nil, err
}
}
err = nil
@ -317,7 +341,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
}
w.decoder = nil
@ -375,6 +398,10 @@ func (w *WAL) cut() error {
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}
newTail.Close()
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
@ -477,7 +504,7 @@ func (w *WAL) Close() error {
plog.Errorf("failed to unlock during closing wal: %s", err)
}
}
return nil
return w.dirFile.Close()
}
func (w *WAL) saveEntry(e *raftpb.Entry) error {

View File

@ -636,3 +636,89 @@ func TestRestartCreateWal(t *testing.T) {
t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
}
}
// TestOpenOnTornWrite ensures that entries past the torn write are truncated.
func TestOpenOnTornWrite(t *testing.T) {
maxEntries := 40
clobberIdx := 20
overwriteEntries := 5
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
w, err := Create(p, nil)
defer w.Close()
if err != nil {
t.Fatal(err)
}
// get offset of end of each saved entry
offsets := make([]int64, maxEntries)
for i := range offsets {
es := []raftpb.Entry{{Index: uint64(i)}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
if offsets[i], err = w.tail().Seek(0, os.SEEK_CUR); err != nil {
t.Fatal(err)
}
}
fn := path.Join(p, path.Base(w.tail().Name()))
w.Close()
// clobber some entry with 0's to simulate a torn write
f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
if ferr != nil {
t.Fatal(ferr)
}
defer f.Close()
_, err = f.Seek(offsets[clobberIdx], os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
_, err = f.Write(zeros)
if err != nil {
t.Fatal(err)
}
f.Close()
w, err = Open(p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
// seek up to clobbered entry
_, _, _, err = w.ReadAll()
if err != nil {
t.Fatal(err)
}
// write a few entries past the clobbered entry
for i := 0; i < overwriteEntries; i++ {
// Index is different from old, truncated entries
es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
}
w.Close()
// read back the entries, confirm number of entries matches expectation
w, err = OpenForRead(p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
_, _, ents, rerr := w.ReadAll()
if rerr != nil {
// CRC error? the old entries were likely never truncated away
t.Fatal(rerr)
}
wEntries := (clobberIdx - 1) + overwriteEntries
if len(ents) != wEntries {
t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
}
}

44
wal/wal_unix.go Normal file
View File

@ -0,0 +1,44 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package wal
import (
"os"
"github.com/coreos/etcd/pkg/fileutil"
)
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
// On non-Windows platforms, hold the lock while renaming. Releasing
// the lock and trying to reacquire it quickly can be flaky because
// it's possible the process will fork to spawn a process while this is
// happening. The fds are set up as close-on-exec by the Go runtime,
// but there is a window between the fork and the exec where another
// process holds the lock.
if err := os.RemoveAll(w.dir); err != nil {
return nil, err
}
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
df, err := fileutil.OpenDir(w.dir)
w.dirFile = df
return w, err
}

41
wal/wal_windows.go Normal file
View File

@ -0,0 +1,41 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"os"
"github.com/coreos/etcd/wal/walpb"
)
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
// rename of directory with locked files doesn't work on
// windows; close the WAL to release the locks so the directory
// can be renamed
w.Close()
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
// reopen and relock
newWAL, oerr := Open(w.dir, walpb.Snapshot{})
if oerr != nil {
return nil, oerr
}
if _, _, _, err := newWAL.ReadAll(); err != nil {
newWAL.Close()
return nil, err
}
return newWAL, nil
}