|
|
|
@ -27,11 +27,14 @@ import (
|
|
|
|
|
"github.com/coreos/etcd/client"
|
|
|
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
|
|
|
"github.com/coreos/etcd/etcdserver"
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/api"
|
|
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/membership"
|
|
|
|
|
"github.com/coreos/etcd/mvcc"
|
|
|
|
|
"github.com/coreos/etcd/mvcc/backend"
|
|
|
|
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
|
|
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
|
|
"github.com/coreos/etcd/snap"
|
|
|
|
|
"github.com/coreos/etcd/store"
|
|
|
|
@ -42,6 +45,7 @@ import (
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
migrateExcludeTTLKey bool
|
|
|
|
|
migrateDatadir string
|
|
|
|
|
migrateWALdir string
|
|
|
|
|
migrateTransformer string
|
|
|
|
@ -55,6 +59,7 @@ func NewMigrateCommand() *cobra.Command {
|
|
|
|
|
Run: migrateCommandFunc,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
|
|
|
|
|
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
|
|
|
|
|
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
|
|
|
|
|
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
|
|
|
|
@ -74,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
|
|
|
|
|
writer, reader, errc = defaultTransformer()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
st := rebuildStoreV2()
|
|
|
|
|
st, index := rebuildStoreV2()
|
|
|
|
|
be := prepareBackend()
|
|
|
|
|
defer be.Close()
|
|
|
|
|
|
|
|
|
|
maxIndexc := make(chan uint64, 1)
|
|
|
|
|
go func() {
|
|
|
|
|
maxIndexc <- writeStore(writer, st)
|
|
|
|
|
writeStore(writer, st)
|
|
|
|
|
writer.Close()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
readKeys(reader, be)
|
|
|
|
|
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
|
|
|
|
|
mvcc.UpdateConsistentIndex(be, index)
|
|
|
|
|
err := <-errc
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("failed to transform keys")
|
|
|
|
@ -106,7 +110,10 @@ func prepareBackend() backend.Backend {
|
|
|
|
|
return be
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func rebuildStoreV2() store.Store {
|
|
|
|
|
func rebuildStoreV2() (store.Store, uint64) {
|
|
|
|
|
var index uint64
|
|
|
|
|
cl := membership.NewCluster("")
|
|
|
|
|
|
|
|
|
|
waldir := migrateWALdir
|
|
|
|
|
if len(waldir) == 0 {
|
|
|
|
|
waldir = path.Join(migrateDatadir, "member", "wal")
|
|
|
|
@ -122,6 +129,7 @@ func rebuildStoreV2() store.Store {
|
|
|
|
|
var walsnap walpb.Snapshot
|
|
|
|
|
if snapshot != nil {
|
|
|
|
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
|
|
|
index = snapshot.Metadata.Index
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w, err := wal.OpenForRead(waldir, walsnap)
|
|
|
|
@ -143,9 +151,15 @@ func rebuildStoreV2() store.Store {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
applier := etcdserver.NewApplierV2(st, nil)
|
|
|
|
|
cl.SetStore(st)
|
|
|
|
|
cl.Recover(api.UpdateCapability)
|
|
|
|
|
|
|
|
|
|
applier := etcdserver.NewApplierV2(st, cl)
|
|
|
|
|
for _, ent := range ents {
|
|
|
|
|
if ent.Type != raftpb.EntryNormal {
|
|
|
|
|
if ent.Type == raftpb.EntryConfChange {
|
|
|
|
|
var cc raftpb.ConfChange
|
|
|
|
|
pbutil.MustUnmarshal(&cc, ent.Data)
|
|
|
|
|
applyConf(cc, cl)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -160,9 +174,34 @@ func rebuildStoreV2() store.Store {
|
|
|
|
|
applyRequest(req, applier)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if ent.Index > index {
|
|
|
|
|
index = ent.Index
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return st
|
|
|
|
|
return st, index
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
|
|
|
|
|
if err := cl.ValidateConfigurationChange(cc); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
switch cc.Type {
|
|
|
|
|
case raftpb.ConfChangeAddNode:
|
|
|
|
|
m := new(membership.Member)
|
|
|
|
|
if err := json.Unmarshal(cc.Context, m); err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
cl.AddMember(m)
|
|
|
|
|
case raftpb.ConfChangeRemoveNode:
|
|
|
|
|
cl.RemoveMember(types.ID(cc.NodeID))
|
|
|
|
|
case raftpb.ConfChangeUpdateNode:
|
|
|
|
|
m := new(membership.Member)
|
|
|
|
|
if err := json.Unmarshal(cc.Context, m); err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
|
|
|
|
@ -216,11 +255,13 @@ func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
|
|
|
|
|
if n.Dir {
|
|
|
|
|
n.Nodes = nil
|
|
|
|
|
}
|
|
|
|
|
if !migrateExcludeTTLKey || n.TTL == 0 {
|
|
|
|
|
b, err := json.Marshal(n)
|
|
|
|
|
if err != nil {
|
|
|
|
|
ExitWithError(ExitError, err)
|
|
|
|
|
}
|
|
|
|
|
fmt.Fprint(w, string(b))
|
|
|
|
|
}
|
|
|
|
|
for _, nn := range nodes {
|
|
|
|
|
max := writeKeys(w, nn)
|
|
|
|
|
if max > maxIndex {
|
|
|
|
|