Compare commits

...

4 Commits

3 changed files with 59 additions and 18 deletions

View File

@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 { if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs) t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
} }
if resp.Kvs[0].CreateRevision != 4 { if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision) t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
} }
} }

View File

@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap" "github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
@ -42,9 +45,10 @@ import (
) )
var ( var (
migrateDatadir string migrateExcludeTTLKey bool
migrateWALdir string migrateDatadir string
migrateTransformer string migrateWALdir string
migrateTransformer string
) )
// NewMigrateCommand returns the cobra command for "migrate". // NewMigrateCommand returns the cobra command for "migrate".
@ -55,6 +59,7 @@ func NewMigrateCommand() *cobra.Command {
Run: migrateCommandFunc, Run: migrateCommandFunc,
} }
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory") mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory") mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program") mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
@ -74,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer() writer, reader, errc = defaultTransformer()
} }
st := rebuildStoreV2() st, index := rebuildStoreV2()
be := prepareBackend() be := prepareBackend()
defer be.Close() defer be.Close()
maxIndexc := make(chan uint64, 1)
go func() { go func() {
maxIndexc <- writeStore(writer, st) writeStore(writer, st)
writer.Close() writer.Close()
}() }()
readKeys(reader, be) readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc) mvcc.UpdateConsistentIndex(be, index)
err := <-errc err := <-errc
if err != nil { if err != nil {
fmt.Println("failed to transform keys") fmt.Println("failed to transform keys")
@ -106,7 +110,10 @@ func prepareBackend() backend.Backend {
return be return be
} }
func rebuildStoreV2() store.Store { func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")
waldir := migrateWALdir waldir := migrateWALdir
if len(waldir) == 0 { if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal") waldir = path.Join(migrateDatadir, "member", "wal")
@ -122,6 +129,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
} }
w, err := wal.OpenForRead(waldir, walsnap) w, err := wal.OpenForRead(waldir, walsnap)
@ -143,9 +151,15 @@ func rebuildStoreV2() store.Store {
} }
} }
applier := etcdserver.NewApplierV2(st, nil) cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents { for _, ent := range ents {
if ent.Type != raftpb.EntryNormal { if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue continue
} }
@ -160,9 +174,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier) applyRequest(req, applier)
} }
} }
if ent.Index > index {
index = ent.Index
}
} }
return st return st, index
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
} }
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) { func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
@ -216,11 +255,13 @@ func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
if n.Dir { if n.Dir {
n.Nodes = nil n.Nodes = nil
} }
b, err := json.Marshal(n) if !migrateExcludeTTLKey || n.TTL == 0 {
if err != nil { b, err := json.Marshal(n)
ExitWithError(ExitError, err) if err != nil {
ExitWithError(ExitError, err)
}
fmt.Fprint(w, string(b))
} }
fmt.Fprint(w, string(b))
for _, nn := range nodes { for _, nn := range nodes {
max := writeKeys(w, nn) max := writeKeys(w, nn)
if max > maxIndex { if max > maxIndex {

View File

@ -29,7 +29,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0" MinClusterVersion = "2.3.0"
Version = "3.0.13" Version = "3.0.14"
// Git SHA Value will be set during build // Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)" GitSHA = "Not provided (use ./build instead of go build)"