*:fix point-in-time backup

Backup process should be able to read all WALs until io.EOF to
generate a point-in-time backup.

Our WAL file is append-only. And the backup process will lock all
files before start reading, which can prevent the gc routine from
removing any files in the middle.
This commit is contained in:
Xiang Li
2015-06-12 16:06:02 -07:00
parent 219d304291
commit f59da0e453
3 changed files with 64 additions and 43 deletions

View File

@ -66,7 +66,7 @@ func handleBackup(c *cli.Context) {
} }
} }
w, err := wal.OpenNotInUse(srcWAL, walsnap) w, err := wal.OpenForRead(srcWAL, walsnap)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -137,13 +137,13 @@ func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true) return openAtIndex(dirpath, snap, true)
} }
// OpenNotInUse only opens the wal files that are not in use. // OpenForRead only opens the wal files for read.
// Other than that, it is similar to Open. // Write on a read only wal panics.
func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) { func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, false) return openAtIndex(dirpath, snap, false)
} }
func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := fileutil.ReadDir(dirpath) names, err := fileutil.ReadDir(dirpath)
if err != nil { if err != nil {
return nil, err return nil, err
@ -172,11 +172,8 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
} }
err = l.TryLock() err = l.TryLock()
if err != nil { if err != nil {
if all { if write {
return nil, err return nil, err
} else {
plog.Warningf("opened all the files until %s, since it is still in use by an etcd server", name)
break
} }
} }
rcs = append(rcs, f) rcs = append(rcs, f)
@ -184,33 +181,40 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
} }
rc := MultiReadCloser(rcs...) rc := MultiReadCloser(rcs...)
// open the lastest wal file for appending
seq, _, err := parseWalName(names[len(names)-1])
if err != nil {
rc.Close()
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
rc.Close()
return nil, err
}
// create a WAL ready for reading // create a WAL ready for reading
w := &WAL{ w := &WAL{
dir: dirpath, dir: dirpath,
start: snap, start: snap,
decoder: newDecoder(rc), decoder: newDecoder(rc),
locks: ls,
f: f,
seq: seq,
locks: ls,
} }
if write {
// open the lastest wal file for appending
seq, _, err := parseWalName(names[len(names)-1])
if err != nil {
rc.Close()
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
rc.Close()
return nil, err
}
w.f = f
w.seq = seq
}
return w, nil return w, nil
} }
// ReadAll reads out all records of the current WAL. // ReadAll reads out records of the current WAL.
// If opened in write mode, it must read out all records until EOF. Or an error
// will be returned.
// If opened in read mode, it will try to read all records if possible.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If loaded snap doesn't match with the expected one, it will return // If loaded snap doesn't match with the expected one, it will return
// all the records and error ErrSnapshotMismatch. // all the records and error ErrSnapshotMismatch.
@ -265,10 +269,24 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
} }
} }
if err != io.EOF {
state.Reset() switch w.f {
return nil, state, nil, err case nil:
// We do not have to read out all entries in read mode.
// The last record maybe a partial written one, so
// ErrunexpectedEOF might be returned.
if err != io.EOF && err != io.ErrUnexpectedEOF {
state.Reset()
return nil, state, nil, err
}
default:
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
state.Reset()
return nil, state, nil, err
}
} }
err = nil err = nil
if !match { if !match {
err = ErrSnapshotNotFound err = ErrSnapshotNotFound
@ -279,10 +297,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
w.start = walpb.Snapshot{} w.start = walpb.Snapshot{}
w.metadata = metadata w.metadata = metadata
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.f, w.decoder.lastCRC()) if w.f != nil {
w.decoder = nil // create encoder (chain crc with the decoder), enable appending
lastIndexSaved.Set(float64(w.enti)) w.encoder = newEncoder(w.f, w.decoder.lastCRC())
w.decoder = nil
lastIndexSaved.Set(float64(w.enti))
}
return metadata, state, ents, err return metadata, state, ents, err
} }

View File

@ -404,12 +404,11 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
w.Close() w.Close()
} }
// TestOpenNotInUse tests that OpenNotInUse can load all files that are // TestOpenForRead tests that OpenForRead can load all files.
// not in use at that point.
// The tests creates WAL directory, and cut out multiple WAL files. Then // The tests creates WAL directory, and cut out multiple WAL files. Then
// it releases the lock of part of data, and excepts that OpenNotInUse // it releases the lock of part of data, and excepts that OpenForRead
// can read out all unlocked data. // can read out all files even if some are locked for write.
func TestOpenNotInUse(t *testing.T) { func TestOpenForRead(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest") p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -435,8 +434,8 @@ func TestOpenNotInUse(t *testing.T) {
unlockIndex := uint64(5) unlockIndex := uint64(5)
w.ReleaseLockTo(unlockIndex) w.ReleaseLockTo(unlockIndex)
// 1,2,3 are avaliable. // All are avaliable for read
w2, err := OpenNotInUse(p, walpb.Snapshot{}) w2, err := OpenForRead(p, walpb.Snapshot{})
defer w2.Close() defer w2.Close()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -445,8 +444,8 @@ func TestOpenNotInUse(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err = %v, want nil", err) t.Fatalf("err = %v, want nil", err)
} }
if g := ents[len(ents)-1].Index; g != unlockIndex-2 { if g := ents[len(ents)-1].Index; g != 9 {
t.Errorf("last index read = %d, want %d", g, unlockIndex-2) t.Errorf("last index read = %d, want %d", g, 9)
} }
} }