wal: record and check snapshot
This commit is contained in:
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewBackupCommand() cli.Command {
|
func NewBackupCommand() cli.Command {
|
||||||
@ -57,16 +58,16 @@ func handleBackup(c *cli.Context) {
|
|||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
var index uint64
|
var walsnap walpb.Snapshot
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
index = snapshot.Metadata.Index
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
newss := snap.New(destSnap)
|
newss := snap.New(destSnap)
|
||||||
if err := newss.SaveSnap(*snapshot); err != nil {
|
if err := newss.SaveSnap(*snapshot); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := wal.OpenNotInUse(srcWAL, index)
|
w, err := wal.OpenNotInUse(srcWAL, walsnap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -26,10 +26,15 @@ import (
|
|||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
var walsnap walpb.Snapshot
|
||||||
|
if snapshot != nil {
|
||||||
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
|
}
|
||||||
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
||||||
cfg.Cluster.SetID(cid)
|
cfg.Cluster.SetID(cid)
|
||||||
|
|
||||||
// discard the previously uncommitted entries
|
// discard the previously uncommitted entries
|
||||||
|
@ -46,6 +46,7 @@ import (
|
|||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -219,7 +220,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
if cfg.ShouldDiscover() {
|
if cfg.ShouldDiscover() {
|
||||||
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
||||||
}
|
}
|
||||||
var index uint64
|
|
||||||
snapshot, err := ss.Load()
|
snapshot, err := ss.Load()
|
||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -229,7 +229,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
log.Panicf("etcdserver: recovered store from snapshot error: %v", err)
|
log.Panicf("etcdserver: recovered store from snapshot error: %v", err)
|
||||||
}
|
}
|
||||||
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||||
index = snapshot.Metadata.Index
|
|
||||||
}
|
}
|
||||||
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
|
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
@ -237,9 +236,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster)
|
log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster)
|
||||||
}
|
}
|
||||||
if !cfg.ForceNewCluster {
|
if !cfg.ForceNewCluster {
|
||||||
id, n, s, w = restartNode(cfg, index+1, snapshot)
|
id, n, s, w = restartNode(cfg, snapshot)
|
||||||
} else {
|
} else {
|
||||||
id, n, s, w = restartAsStandaloneNode(cfg, index+1, snapshot)
|
id, n, s, w = restartAsStandaloneNode(cfg, snapshot)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
@ -860,6 +859,9 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
|
|||||||
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
||||||
log.Fatalf("etcdserver: create wal error: %v", err)
|
log.Fatalf("etcdserver: create wal error: %v", err)
|
||||||
}
|
}
|
||||||
|
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
|
log.Fatalf("etcdserver: save empty snapshot error: %v", err)
|
||||||
|
}
|
||||||
peers := make([]raft.Peer, len(ids))
|
peers := make([]raft.Peer, len(ids))
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
|
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
|
||||||
@ -875,8 +877,12 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
var walsnap walpb.Snapshot
|
||||||
|
if snapshot != nil {
|
||||||
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
|
}
|
||||||
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
||||||
cfg.Cluster.SetID(cid)
|
cfg.Cluster.SetID(cid)
|
||||||
|
|
||||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
@ -43,6 +44,14 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
walsnap := walpb.Snapshot{
|
||||||
|
Index: snap.Metadata.Index,
|
||||||
|
Term: snap.Metadata.Term,
|
||||||
|
}
|
||||||
|
err = st.WAL.SaveSnapshot(walsnap)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -50,9 +59,9 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||||
var err error
|
var err error
|
||||||
if w, err = wal.Open(waldir, index); err != nil {
|
if w, err = wal.Open(waldir, snap); err != nil {
|
||||||
log.Fatalf("etcdserver: open wal error: %v", err)
|
log.Fatalf("etcdserver: open wal error: %v", err)
|
||||||
}
|
}
|
||||||
var wmetadata []byte
|
var wmetadata []byte
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
raftpb "github.com/coreos/etcd/raft/raftpb"
|
raftpb "github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func snapDir4(dataDir string) string {
|
func snapDir4(dataDir string) string {
|
||||||
@ -106,13 +107,18 @@ func Migrate4To2(dataDir string, name string) error {
|
|||||||
log.Printf("Log migration successful")
|
log.Printf("Log migration successful")
|
||||||
|
|
||||||
// migrate snapshot (if necessary) and logs
|
// migrate snapshot (if necessary) and logs
|
||||||
|
var walsnap walpb.Snapshot
|
||||||
if snap2 != nil {
|
if snap2 != nil {
|
||||||
|
walsnap.Index, walsnap.Term = snap2.Metadata.Index, snap2.Metadata.Term
|
||||||
ss := snap.New(sd2)
|
ss := snap.New(sd2)
|
||||||
if err := ss.SaveSnap(*snap2); err != nil {
|
if err := ss.SaveSnap(*snap2); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Printf("Snapshot migration successful")
|
log.Printf("Snapshot migration successful")
|
||||||
}
|
}
|
||||||
|
if err = w.SaveSnapshot(walsnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -24,20 +25,20 @@ func main() {
|
|||||||
|
|
||||||
ss := snap.New(snapDir(*from))
|
ss := snap.New(snapDir(*from))
|
||||||
snapshot, err := ss.Load()
|
snapshot, err := ss.Load()
|
||||||
var index uint64
|
var walsnap walpb.Snapshot
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
index = snapshot.Metadata.Index
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
|
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
|
||||||
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
|
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
|
||||||
snapshot.Metadata.Term, index, nodes)
|
walsnap.Term, walsnap.Index, nodes)
|
||||||
case snap.ErrNoSnapshot:
|
case snap.ErrNoSnapshot:
|
||||||
fmt.Printf("Snapshot:\nempty\n")
|
fmt.Printf("Snapshot:\nempty\n")
|
||||||
default:
|
default:
|
||||||
log.Fatalf("Failed loading snapshot: %v", err)
|
log.Fatalf("Failed loading snapshot: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := wal.Open(walDir(*from), index+1)
|
w, err := wal.Open(walDir(*from), walsnap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed opening WAL: %v", err)
|
log.Fatalf("Failed opening WAL: %v", err)
|
||||||
}
|
}
|
||||||
|
69
wal/wal.go
69
wal/wal.go
@ -38,6 +38,7 @@ const (
|
|||||||
entryType
|
entryType
|
||||||
stateType
|
stateType
|
||||||
crcType
|
crcType
|
||||||
|
snapshotType
|
||||||
|
|
||||||
// the owner can make/remove files inside the directory
|
// the owner can make/remove files inside the directory
|
||||||
privateDirMode = 0700
|
privateDirMode = 0700
|
||||||
@ -47,6 +48,8 @@ var (
|
|||||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||||
ErrFileNotFound = errors.New("wal: file not found")
|
ErrFileNotFound = errors.New("wal: file not found")
|
||||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||||
|
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||||
|
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,8 +63,8 @@ type WAL struct {
|
|||||||
metadata []byte // metadata recorded at the head of each WAL
|
metadata []byte // metadata recorded at the head of each WAL
|
||||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||||
|
|
||||||
ri uint64 // index of entry to start reading
|
start walpb.Snapshot // snapshot to start reading
|
||||||
decoder *decoder // decoder to decode records
|
decoder *decoder // decoder to decode records
|
||||||
|
|
||||||
f *os.File // underlay file opened for appending, sync
|
f *os.File // underlay file opened for appending, sync
|
||||||
seq uint64 // sequence of the wal file currently used for writes
|
seq uint64 // sequence of the wal file currently used for writes
|
||||||
@ -116,23 +119,23 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the WAL at the given index.
|
// Open opens the WAL at the given snap.
|
||||||
// The index SHOULD have been previously committed to the WAL, or the following
|
// The snap SHOULD have been previously saved to the WAL, or the following
|
||||||
// ReadAll will fail.
|
// ReadAll will fail.
|
||||||
// The returned WAL is ready to read and the first record will be the given
|
// The returned WAL is ready to read and the first record will be the one after
|
||||||
// index. The WAL cannot be appended to before reading out all of its
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||||
// previous records.
|
// previous records.
|
||||||
func Open(dirpath string, index uint64) (*WAL, error) {
|
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
return openAtIndex(dirpath, index, true)
|
return openAtIndex(dirpath, snap, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenNotInUse only opens the wal files that are not in use.
|
// OpenNotInUse only opens the wal files that are not in use.
|
||||||
// Other than that, it is similar to Open.
|
// Other than that, it is similar to Open.
|
||||||
func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
|
func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
return openAtIndex(dirpath, index, false)
|
return openAtIndex(dirpath, snap, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
|
func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
|
||||||
names, err := fileutil.ReadDir(dirpath)
|
names, err := fileutil.ReadDir(dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -142,7 +145,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
|
|||||||
return nil, ErrFileNotFound
|
return nil, ErrFileNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
nameIndex, ok := searchIndex(names, index)
|
nameIndex, ok := searchIndex(names, snap.Index)
|
||||||
if !ok || !isValidSeq(names[nameIndex:]) {
|
if !ok || !isValidSeq(names[nameIndex:]) {
|
||||||
return nil, ErrFileNotFound
|
return nil, ErrFileNotFound
|
||||||
}
|
}
|
||||||
@ -189,7 +192,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
|
|||||||
// create a WAL ready for reading
|
// create a WAL ready for reading
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
ri: index,
|
start: snap,
|
||||||
decoder: newDecoder(rc),
|
decoder: newDecoder(rc),
|
||||||
|
|
||||||
f: f,
|
f: f,
|
||||||
@ -200,18 +203,23 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReadAll reads out all records of the current WAL.
|
// ReadAll reads out all records of the current WAL.
|
||||||
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
|
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
|
||||||
|
// If loaded snap doesn't match with the expected one, it will return
|
||||||
|
// ErrSnapshotMismatch.
|
||||||
|
// TODO: detect not-last-snap error.
|
||||||
|
// TODO: maybe loose the checking of match.
|
||||||
// After ReadAll, the WAL will be ready for appending new records.
|
// After ReadAll, the WAL will be ready for appending new records.
|
||||||
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
||||||
rec := &walpb.Record{}
|
rec := &walpb.Record{}
|
||||||
decoder := w.decoder
|
decoder := w.decoder
|
||||||
|
|
||||||
|
var match bool
|
||||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||||
switch rec.Type {
|
switch rec.Type {
|
||||||
case entryType:
|
case entryType:
|
||||||
e := mustUnmarshalEntry(rec.Data)
|
e := mustUnmarshalEntry(rec.Data)
|
||||||
if e.Index >= w.ri {
|
if e.Index > w.start.Index {
|
||||||
ents = append(ents[:e.Index-w.ri], e)
|
ents = append(ents[:e.Index-w.start.Index-1], e)
|
||||||
}
|
}
|
||||||
w.enti = e.Index
|
w.enti = e.Index
|
||||||
case stateType:
|
case stateType:
|
||||||
@ -231,6 +239,16 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
return nil, state, nil, ErrCRCMismatch
|
return nil, state, nil, ErrCRCMismatch
|
||||||
}
|
}
|
||||||
decoder.updateCRC(rec.Crc)
|
decoder.updateCRC(rec.Crc)
|
||||||
|
case snapshotType:
|
||||||
|
var snap walpb.Snapshot
|
||||||
|
pbutil.MustUnmarshal(&snap, rec.Data)
|
||||||
|
if snap.Index == w.start.Index {
|
||||||
|
if snap.Term != w.start.Term {
|
||||||
|
state.Reset()
|
||||||
|
return nil, state, nil, ErrSnapshotMismatch
|
||||||
|
}
|
||||||
|
match = true
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
state.Reset()
|
state.Reset()
|
||||||
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
||||||
@ -240,10 +258,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
state.Reset()
|
state.Reset()
|
||||||
return nil, state, nil, err
|
return nil, state, nil, err
|
||||||
}
|
}
|
||||||
|
if !match {
|
||||||
|
state.Reset()
|
||||||
|
return nil, state, nil, ErrSnapshotNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// close decoder, disable reading
|
// close decoder, disable reading
|
||||||
w.decoder.close()
|
w.decoder.close()
|
||||||
w.ri = 0
|
w.start = walpb.Snapshot{}
|
||||||
|
|
||||||
w.metadata = metadata
|
w.metadata = metadata
|
||||||
// create encoder (chain crc with the decoder), enable appending
|
// create encoder (chain crc with the decoder), enable appending
|
||||||
@ -374,6 +396,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|||||||
return w.sync()
|
return w.sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
||||||
|
b := pbutil.MustMarshal(&e)
|
||||||
|
rec := &walpb.Record{Type: snapshotType, Data: b}
|
||||||
|
if err := w.encoder.encode(rec); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// update enti only when snapshot is ahead of last index
|
||||||
|
if w.enti < e.Index {
|
||||||
|
w.enti = e.Index
|
||||||
|
}
|
||||||
|
return w.sync()
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
w, err := Open(dir, 0)
|
w, err := Open(dir, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -109,7 +109,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
w, err = Open(dir, 5)
|
w, err = Open(dir, walpb.Snapshot{Index: 5})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -126,7 +126,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(emptydir)
|
defer os.RemoveAll(emptydir)
|
||||||
if _, err = Open(emptydir, 0); err != ErrFileNotFound {
|
if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
|
||||||
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
|
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,6 +168,10 @@ func TestCut(t *testing.T) {
|
|||||||
if err := w.Cut(); err != nil {
|
if err := w.Cut(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
snap := walpb.Snapshot{Index: 2, Term: 1}
|
||||||
|
if err := w.SaveSnapshot(snap); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
wname = walName(2, 2)
|
wname = walName(2, 2)
|
||||||
if g := path.Base(w.f.Name()); g != wname {
|
if g := path.Base(w.f.Name()); g != wname {
|
||||||
t.Errorf("name = %s, want %s", g, wname)
|
t.Errorf("name = %s, want %s", g, wname)
|
||||||
@ -183,7 +187,7 @@ func TestCut(t *testing.T) {
|
|||||||
defer f.Close()
|
defer f.Close()
|
||||||
nw := &WAL{
|
nw := &WAL{
|
||||||
decoder: newDecoder(f),
|
decoder: newDecoder(f),
|
||||||
ri: 2,
|
start: snap,
|
||||||
}
|
}
|
||||||
_, gst, _, err := nw.ReadAll()
|
_, gst, _, err := nw.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -205,7 +209,10 @@ func TestRecover(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
|
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
|
||||||
for _, e := range ents {
|
for _, e := range ents {
|
||||||
if err = w.SaveEntry(&e); err != nil {
|
if err = w.SaveEntry(&e); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -219,7 +226,7 @@ func TestRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
if w, err = Open(p, 0); err != nil {
|
if w, err = Open(p, walpb.Snapshot{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
metadata, state, entries, err := w.ReadAll()
|
metadata, state, entries, err := w.ReadAll()
|
||||||
@ -319,14 +326,10 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// TODO(unihorn): remove this when cut can operate on an empty file
|
for i := 0; i < 10; i++ {
|
||||||
if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
|
if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err = w.Cut(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
for i := 1; i < 10; i++ {
|
|
||||||
e := raftpb.Entry{Index: uint64(i)}
|
e := raftpb.Entry{Index: uint64(i)}
|
||||||
if err = w.SaveEntry(&e); err != nil {
|
if err = w.SaveEntry(&e); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -342,7 +345,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
w, err := Open(p, uint64(i))
|
w, err := Open(p, walpb.Snapshot{Index: uint64(i)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if i <= 4 {
|
if i <= 4 {
|
||||||
if err != ErrFileNotFound {
|
if err != ErrFileNotFound {
|
||||||
@ -362,8 +365,8 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||||||
t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
|
t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
|
||||||
}
|
}
|
||||||
for j, e := range entries {
|
for j, e := range entries {
|
||||||
if e.Index != uint64(j+i) {
|
if e.Index != uint64(j+i+1) {
|
||||||
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
|
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
@ -381,12 +384,15 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
|
if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
w, err = Open(p, 1)
|
w, err = Open(p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
|
|
||||||
It has these top-level messages:
|
It has these top-level messages:
|
||||||
Record
|
Record
|
||||||
|
Snapshot
|
||||||
*/
|
*/
|
||||||
package walpb
|
package walpb
|
||||||
|
|
||||||
@ -38,6 +39,16 @@ func (m *Record) Reset() { *m = Record{} }
|
|||||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
func (m *Record) String() string { return proto.CompactTextString(m) }
|
||||||
func (*Record) ProtoMessage() {}
|
func (*Record) ProtoMessage() {}
|
||||||
|
|
||||||
|
type Snapshot struct {
|
||||||
|
Index uint64 `protobuf:"varint,1,req,name=index" json:"index"`
|
||||||
|
Term uint64 `protobuf:"varint,2,req,name=term" json:"term"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Snapshot) Reset() { *m = Snapshot{} }
|
||||||
|
func (m *Snapshot) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*Snapshot) ProtoMessage() {}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
}
|
}
|
||||||
func (m *Record) Unmarshal(data []byte) error {
|
func (m *Record) Unmarshal(data []byte) error {
|
||||||
@ -134,6 +145,78 @@ func (m *Record) Unmarshal(data []byte) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *Snapshot) Unmarshal(data []byte) error {
|
||||||
|
l := len(data)
|
||||||
|
index := 0
|
||||||
|
for index < l {
|
||||||
|
var wire uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if index >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := data[index]
|
||||||
|
index++
|
||||||
|
wire |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fieldNum := int32(wire >> 3)
|
||||||
|
wireType := int(wire & 0x7)
|
||||||
|
switch fieldNum {
|
||||||
|
case 1:
|
||||||
|
if wireType != 0 {
|
||||||
|
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
|
||||||
|
}
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if index >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := data[index]
|
||||||
|
index++
|
||||||
|
m.Index |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
if wireType != 0 {
|
||||||
|
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
|
||||||
|
}
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if index >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := data[index]
|
||||||
|
index++
|
||||||
|
m.Term |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
var sizeOfWire int
|
||||||
|
for {
|
||||||
|
sizeOfWire++
|
||||||
|
wire >>= 7
|
||||||
|
if wire == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
index -= sizeOfWire
|
||||||
|
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if (index + skippy) > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
|
||||||
|
index += skippy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (m *Record) Size() (n int) {
|
func (m *Record) Size() (n int) {
|
||||||
var l int
|
var l int
|
||||||
_ = l
|
_ = l
|
||||||
@ -148,6 +231,16 @@ func (m *Record) Size() (n int) {
|
|||||||
}
|
}
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
func (m *Snapshot) Size() (n int) {
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
n += 1 + sovRecord(uint64(m.Index))
|
||||||
|
n += 1 + sovRecord(uint64(m.Term))
|
||||||
|
if m.XXX_unrecognized != nil {
|
||||||
|
n += len(m.XXX_unrecognized)
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
func sovRecord(x uint64) (n int) {
|
func sovRecord(x uint64) (n int) {
|
||||||
for {
|
for {
|
||||||
@ -194,6 +287,32 @@ func (m *Record) MarshalTo(data []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
return i, nil
|
return i, nil
|
||||||
}
|
}
|
||||||
|
func (m *Snapshot) Marshal() (data []byte, err error) {
|
||||||
|
size := m.Size()
|
||||||
|
data = make([]byte, size)
|
||||||
|
n, err := m.MarshalTo(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return data[:n], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
|
||||||
|
var i int
|
||||||
|
_ = i
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
data[i] = 0x8
|
||||||
|
i++
|
||||||
|
i = encodeVarintRecord(data, i, uint64(m.Index))
|
||||||
|
data[i] = 0x10
|
||||||
|
i++
|
||||||
|
i = encodeVarintRecord(data, i, uint64(m.Term))
|
||||||
|
if m.XXX_unrecognized != nil {
|
||||||
|
i += copy(data[i:], m.XXX_unrecognized)
|
||||||
|
}
|
||||||
|
return i, nil
|
||||||
|
}
|
||||||
func encodeFixed64Record(data []byte, offset int, v uint64) int {
|
func encodeFixed64Record(data []byte, offset int, v uint64) int {
|
||||||
data[offset] = uint8(v)
|
data[offset] = uint8(v)
|
||||||
data[offset+1] = uint8(v >> 8)
|
data[offset+1] = uint8(v >> 8)
|
||||||
|
@ -12,3 +12,8 @@ message Record {
|
|||||||
required uint32 crc = 2 [(gogoproto.nullable) = false];
|
required uint32 crc = 2 [(gogoproto.nullable) = false];
|
||||||
optional bytes data = 3;
|
optional bytes data = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message Snapshot {
|
||||||
|
required uint64 index = 1 [(gogoproto.nullable) = false];
|
||||||
|
required uint64 term = 2 [(gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user