*: create ID type
This creates a simple ID type (wrapped around uint64) to provide for standard serialization/deserialization to a string (i.e. base 16 encoded). This replaces strutil so now that package is removed.
This commit is contained in:
@ -27,7 +27,6 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -36,7 +35,7 @@ import (
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/stats"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/strutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
@ -132,7 +131,7 @@ type Stats interface {
|
||||
// StoreStats returns statistics of the store backing this EtcdServer
|
||||
StoreStats() []byte
|
||||
// UpdateRecvApp updates the underlying statistics in response to a receiving an Append request
|
||||
UpdateRecvApp(from uint64, length int64)
|
||||
UpdateRecvApp(from types.ID, length int64)
|
||||
}
|
||||
|
||||
type RaftTimer interface {
|
||||
@ -145,7 +144,7 @@ type EtcdServer struct {
|
||||
w wait.Wait
|
||||
done chan struct{}
|
||||
stopped chan struct{}
|
||||
id uint64
|
||||
id types.ID
|
||||
attributes Attributes
|
||||
|
||||
Cluster *Cluster
|
||||
@ -184,7 +183,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
|
||||
st := store.New()
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
var id uint64
|
||||
var id types.ID
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
switch {
|
||||
case !haveWAL && cfg.ClusterState == ClusterStateValueExisting:
|
||||
@ -240,9 +239,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
|
||||
|
||||
sstats := &stats.ServerStats{
|
||||
Name: cfg.Name,
|
||||
ID: strutil.IDAsHex(id),
|
||||
ID: id.String(),
|
||||
}
|
||||
lstats := stats.NewLeaderStats(strutil.IDAsHex(id))
|
||||
lstats := stats.NewLeaderStats(id.String())
|
||||
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
@ -290,7 +289,7 @@ func (s *EtcdServer) start() {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||
if s.Cluster.IsIDRemoved(m.From) {
|
||||
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
|
||||
return ErrRemoved
|
||||
}
|
||||
return s.node.Step(ctx, m)
|
||||
@ -423,8 +422,8 @@ func (s *EtcdServer) StoreStats() []byte {
|
||||
return s.store.JsonStats()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) UpdateRecvApp(from uint64, length int64) {
|
||||
s.stats.RecvAppendReq(strutil.IDAsHex(from), int(length))
|
||||
func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) {
|
||||
s.stats.RecvAppendReq(from.String(), int(length))
|
||||
}
|
||||
|
||||
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
||||
@ -436,7 +435,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
||||
cc := raftpb.ConfChange{
|
||||
ID: GenID(),
|
||||
Type: raftpb.ConfChangeAddNode,
|
||||
NodeID: memb.ID,
|
||||
NodeID: uint64(memb.ID),
|
||||
Context: b,
|
||||
}
|
||||
return s.configure(ctx, cc)
|
||||
@ -528,7 +527,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
||||
cancel()
|
||||
switch err {
|
||||
case nil:
|
||||
log.Printf("etcdserver: published %+v to cluster %x", s.attributes, s.Cluster.ID())
|
||||
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID())
|
||||
return
|
||||
case ErrStopped:
|
||||
log.Printf("etcdserver: aborting publish because server is stopped")
|
||||
@ -595,7 +594,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
id := mustParseMemberIDFromKey(path.Dir(r.Path))
|
||||
m := s.Cluster.Member(id)
|
||||
if m == nil {
|
||||
log.Fatalf("fetch member %x should never fail", id)
|
||||
log.Fatalf("fetch member %s should never fail", id)
|
||||
}
|
||||
if err := json.Unmarshal([]byte(r.Val), &m.Attributes); err != nil {
|
||||
log.Fatalf("unmarshal %s should never fail: %v", r.Val, err)
|
||||
@ -634,18 +633,18 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
panic("unexpected unmarshal error")
|
||||
}
|
||||
if cc.NodeID != m.ID {
|
||||
if cc.NodeID != uint64(m.ID) {
|
||||
panic("unexpected nodeID mismatch")
|
||||
}
|
||||
s.Cluster.AddMember(m)
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
s.Cluster.RemoveMember(cc.NodeID)
|
||||
s.Cluster.RemoveMember(types.ID(cc.NodeID))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
||||
if s.Cluster.IsIDRemoved(cc.NodeID) {
|
||||
if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
switch cc.Type {
|
||||
@ -692,7 +691,7 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
||||
log.Printf("etcdserver: unmarshal body error: %v", err)
|
||||
continue
|
||||
}
|
||||
id, err := strconv.ParseUint(resp.Header.Get("X-Etcd-Cluster-ID"), 16, 64)
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
log.Printf("etcdserver: parse uint error: %v", err)
|
||||
continue
|
||||
@ -702,12 +701,17 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) {
|
||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *wal.WAL) {
|
||||
var err error
|
||||
// TODO: remove the discoveryURL when it becomes part of the source for
|
||||
// generating nodeID.
|
||||
member := cfg.Cluster.MemberByName(cfg.Name)
|
||||
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: member.ID, ClusterID: cfg.Cluster.ID()})
|
||||
metadata := pbutil.MustMarshal(
|
||||
&pb.Metadata{
|
||||
NodeID: uint64(member.ID),
|
||||
ClusterID: uint64(cfg.Cluster.ID()),
|
||||
},
|
||||
)
|
||||
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@ -717,15 +721,15 @@ func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
peers[i] = raft.Peer{ID: id, Context: ctx}
|
||||
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
||||
}
|
||||
id = member.ID
|
||||
log.Printf("etcdserver: start node %x in cluster %x", id, cfg.Cluster.ID())
|
||||
n = raft.StartNode(id, peers, 10, 1)
|
||||
log.Printf("etcdserver: start node %x in cluster %x", id.String(), cfg.Cluster.ID().String())
|
||||
n = raft.StartNode(uint64(id), peers, 10, 1)
|
||||
return
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id uint64, n raft.Node, w *wal.WAL) {
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
|
||||
var err error
|
||||
// restart a node from previous wal
|
||||
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
|
||||
@ -738,10 +742,10 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
|
||||
|
||||
var metadata pb.Metadata
|
||||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||
id = metadata.NodeID
|
||||
cfg.Cluster.SetID(metadata.ClusterID)
|
||||
log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
n = raft.RestartNode(id, 10, 1, snapshot, st, ents)
|
||||
id = types.ID(metadata.NodeID)
|
||||
cfg.Cluster.SetID(types.ID(metadata.ClusterID))
|
||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
|
||||
return
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user