server: Move bootstraping backend from snapshot to bootstrapBackend
This commit is contained in:
@ -73,14 +73,15 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||||
}
|
}
|
||||||
|
|
||||||
storage, err := bootstrapStorage(cfg, ss, prt)
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
|
storage, err := bootstrapStorage(cfg, haveWAL, ss, prt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster, err := bootstrapCluster(cfg, storage, prt, ss)
|
cluster, err := bootstrapCluster(cfg, haveWAL, storage, prt, ss)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
storage.be.Close()
|
storage.backend.be.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &bootstrappedServer{
|
return &bootstrappedServer{
|
||||||
@ -99,11 +100,16 @@ type bootstrappedServer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type bootstrappedStorage struct {
|
type bootstrappedStorage struct {
|
||||||
beHooks *serverstorage.BackendHooks
|
backend *bootstrappedBackend
|
||||||
st v2store.Store
|
st v2store.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
type bootstrappedBackend struct {
|
||||||
|
beHooks *serverstorage.BackendHooks
|
||||||
be backend.Backend
|
be backend.Backend
|
||||||
ci cindex.ConsistentIndexer
|
ci cindex.ConsistentIndexer
|
||||||
beExist bool
|
beExist bool
|
||||||
|
snapshot *raftpb.Snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
type bootstrapedCluster struct {
|
type bootstrapedCluster struct {
|
||||||
@ -123,19 +129,17 @@ type bootstrappedRaft struct {
|
|||||||
storage *raft.MemoryStorage
|
storage *raft.MemoryStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
|
func bootstrapStorage(cfg config.ServerConfig, haveWAL bool, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
|
||||||
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||||
|
|
||||||
be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
|
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &bootstrappedStorage{
|
return &bootstrappedStorage{
|
||||||
beHooks: beHooks,
|
backend: backend,
|
||||||
st: st,
|
st: st,
|
||||||
be: be,
|
|
||||||
ci: ci,
|
|
||||||
beExist: beExist,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,11 +164,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
|
|||||||
return snap.New(cfg.Logger, cfg.SnapDir())
|
return snap.New(cfg.Logger, cfg.SnapDir())
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) {
|
func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) {
|
||||||
beExist = fileutil.Exist(cfg.BackendPath())
|
beExist := fileutil.Exist(cfg.BackendPath())
|
||||||
ci = cindex.NewConsistentIndex(nil)
|
ci := cindex.NewConsistentIndex(nil)
|
||||||
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
|
beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci)
|
||||||
be = serverstorage.OpenBackend(cfg, beHooks)
|
be := serverstorage.OpenBackend(cfg, beHooks)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil && be != nil {
|
if err != nil && be != nil {
|
||||||
be.Close()
|
be.Close()
|
||||||
@ -175,20 +179,35 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
|
|||||||
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
|
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
|
||||||
err = maybeDefragBackend(cfg, be)
|
err = maybeDefragBackend(cfg, be)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, false, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
|
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
|
||||||
|
|
||||||
// TODO(serathius): Implement schema setup in fresh storage
|
// TODO(serathius): Implement schema setup in fresh storage
|
||||||
|
var (
|
||||||
|
snapshot *raftpb.Snapshot
|
||||||
|
)
|
||||||
|
if haveWAL {
|
||||||
|
snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
if beExist {
|
if beExist {
|
||||||
err = schema.Validate(cfg.Logger, be.BatchTx())
|
err = schema.Validate(cfg.Logger, be.BatchTx())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
|
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
|
||||||
return nil, nil, false, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return be, ci, beExist, beHooks, nil
|
return &bootstrappedBackend{
|
||||||
|
beHooks: beHooks,
|
||||||
|
be: be,
|
||||||
|
ci: ci,
|
||||||
|
beExist: beExist,
|
||||||
|
snapshot: snapshot,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
||||||
@ -210,15 +229,14 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
|||||||
return be.Defrag()
|
return be.Defrag()
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapCluster(cfg config.ServerConfig, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
|
func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
|
||||||
haveWAL := wal.Exist(cfg.WALDir())
|
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case !haveWAL && !cfg.NewCluster:
|
||||||
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.be)
|
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
|
||||||
case !haveWAL && cfg.NewCluster:
|
case !haveWAL && cfg.NewCluster:
|
||||||
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.be)
|
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
|
||||||
case haveWAL:
|
case haveWAL:
|
||||||
c, err = bootstrapClusterWithWAL(cfg, storage, ss)
|
c, err = bootstrapClusterWithWAL(cfg, storage)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
}
|
}
|
||||||
@ -309,7 +327,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*bootstrapedCluster, error) {
|
func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage) (*bootstrapedCluster, error) {
|
||||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||||
}
|
}
|
||||||
@ -324,11 +342,7 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
|
|||||||
zap.String("bwal-dir", cfg.WALDir()),
|
zap.String("bwal-dir", cfg.WALDir()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
snapshot, err := recoverSnapshot(cfg, storage, ss)
|
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
|
||||||
|
|
||||||
b := &bootstrapedCluster{
|
b := &bootstrapedCluster{
|
||||||
wal: bwal,
|
wal: bwal,
|
||||||
@ -358,9 +372,9 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
|
|||||||
b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
|
b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
|
||||||
|
|
||||||
b.raft.cl.SetStore(storage.st)
|
b.raft.cl.SetStore(storage.st)
|
||||||
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.be))
|
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
|
||||||
b.raft.cl.Recover(api.UpdateCapability)
|
b.raft.cl.Recover(api.UpdateCapability)
|
||||||
if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.beExist {
|
if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
|
||||||
bepath := cfg.BackendPath()
|
bepath := cfg.BackendPath()
|
||||||
os.RemoveAll(bepath)
|
os.RemoveAll(bepath)
|
||||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||||
@ -368,27 +382,27 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
|
|||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*raftpb.Snapshot, error) {
|
func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
|
||||||
// Find a snapshot to start/restart a raft node
|
// Find a snapshot to start/restart a raft node
|
||||||
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, be, err
|
||||||
}
|
}
|
||||||
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||||
// bwal log entries
|
// bwal log entries
|
||||||
snapshot, err = ss.LoadNewestAvailable(walSnaps)
|
snapshot, err := ss.LoadNewestAvailable(walSnaps)
|
||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
return nil, err
|
return nil, be, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
if err = storage.st.Recovery(snapshot.Data); err != nil {
|
if err = st.Recovery(snapshot.Data); err != nil {
|
||||||
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil {
|
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
|
||||||
cfg.Logger.Error("illegal v2store content", zap.Error(err))
|
cfg.Logger.Error("illegal v2store content", zap.Error(err))
|
||||||
return nil, err
|
return nil, be, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.Logger.Info(
|
cfg.Logger.Info(
|
||||||
@ -397,7 +411,7 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *
|
|||||||
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
||||||
)
|
)
|
||||||
|
|
||||||
if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil {
|
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
s1, s2 := be.Size(), be.SizeInUse()
|
s1, s2 := be.Size(), be.SizeInUse()
|
||||||
@ -408,13 +422,13 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *
|
|||||||
zap.Int64("backend-size-in-use-bytes", s2),
|
zap.Int64("backend-size-in-use-bytes", s2),
|
||||||
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
|
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
|
||||||
)
|
)
|
||||||
if storage.beExist {
|
if beExist {
|
||||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||||
// etcd from pre-3.0 release.
|
// etcd from pre-3.0 release.
|
||||||
kvindex := storage.ci.ConsistentIndex()
|
kvindex := ci.ConsistentIndex()
|
||||||
if kvindex < snapshot.Metadata.Index {
|
if kvindex < snapshot.Metadata.Index {
|
||||||
if kvindex != 0 {
|
if kvindex != 0 {
|
||||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
|
return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
|
||||||
}
|
}
|
||||||
cfg.Logger.Warn(
|
cfg.Logger.Warn(
|
||||||
"consistent index was never saved",
|
"consistent index was never saved",
|
||||||
@ -425,7 +439,7 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *
|
|||||||
} else {
|
} else {
|
||||||
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
||||||
}
|
}
|
||||||
return snapshot, nil
|
return snapshot, be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
|
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||||
|
@ -304,7 +304,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.storage.be.Close()
|
b.storage.backend.be.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -330,7 +330,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
peerRt: b.prt,
|
peerRt: b.prt,
|
||||||
reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
|
reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
|
||||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||||
consistIndex: b.storage.ci,
|
consistIndex: b.storage.backend.ci,
|
||||||
firstCommitInTerm: notify.NewNotifier(),
|
firstCommitInTerm: notify.NewNotifier(),
|
||||||
clusterVersionChanged: notify.NewNotifier(),
|
clusterVersionChanged: notify.NewNotifier(),
|
||||||
}
|
}
|
||||||
@ -338,8 +338,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
||||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||||
|
|
||||||
srv.be = b.storage.be
|
srv.be = b.storage.backend.be
|
||||||
srv.beHooks = b.storage.beHooks
|
srv.beHooks = b.storage.backend.beHooks
|
||||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
Reference in New Issue
Block a user