refactor command.go: commands do not rely on the etcdStore singleton. So we can seprate command into a package in the furture.

This commit is contained in:
Xiang Li
2013-10-09 20:34:00 -07:00
parent 4bf57537b5
commit 61899d62c5
2 changed files with 38 additions and 19 deletions

View File

@ -40,7 +40,9 @@ func (c *CreateCommand) CommandName() string {
// Create node // Create node
func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdStore.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) s, _ := server.StateMachine().(*store.Store)
e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
debug(err) debug(err)
@ -64,7 +66,9 @@ func (c *UpdateCommand) CommandName() string {
// Update node // Update node
func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdStore.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) s, _ := server.StateMachine().(*store.Store)
e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
debug(err) debug(err)
@ -90,7 +94,9 @@ func (c *TestAndSetCommand) CommandName() string {
// Set the key-value pair if the current value of the key equals to the given prevValue // Set the key-value pair if the current value of the key equals to the given prevValue
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdStore.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, s, _ := server.StateMachine().(*store.Store)
e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
@ -115,7 +121,9 @@ func (c *GetCommand) CommandName() string {
// Get the value of key // Get the value of key
func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdStore.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term()) s, _ := server.StateMachine().(*store.Store)
e, err := s.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
debug(err) debug(err)
@ -138,7 +146,9 @@ func (c *DeleteCommand) CommandName() string {
// Delete the key // Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
e, err := etcdStore.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) s, _ := server.StateMachine().(*store.Store)
e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
debug(err) debug(err)
@ -161,7 +171,9 @@ func (c *WatchCommand) CommandName() string {
} }
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) { func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
eventChan, err := etcdStore.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term()) s, _ := server.StateMachine().(*store.Store)
eventChan, err := s.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term())
if err != nil { if err != nil {
return nil, err return nil, err
@ -195,13 +207,14 @@ func (c *JoinCommand) CommandName() string {
} }
// Join a server to the cluster // Join a server to the cluster
func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
// check if the join command is from a previous machine, who lost all its previous log. // check if the join command is from a previous machine, who lost all its previous log.
e, _ := etcdStore.Get(path.Join("/_etcd/machines", c.Name), false, false, raftServer.CommitIndex(), raftServer.Term()) e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term())
b := make([]byte, 8) b := make([]byte, 8)
binary.PutUvarint(b, raftServer.CommitIndex()) binary.PutUvarint(b, server.CommitIndex())
if e != nil { if e != nil {
return b, nil return b, nil
@ -211,18 +224,18 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
num := machineNum() num := machineNum()
if num == maxClusterSize { if num == maxClusterSize {
debug("Reject join request from ", c.Name) debug("Reject join request from ", c.Name)
return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", raftServer.CommitIndex(), raftServer.Term()) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
} }
addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL) addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
// add peer in raft // add peer in raft
err := raftServer.AddPeer(c.Name, "") err := server.AddPeer(c.Name, "")
// add machine in etcd storage // add machine in etcd storage
key := path.Join("_etcd/machines", c.Name) key := path.Join("_etcd/machines", c.Name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Create(key, value, false, false, store.Permanent, raftServer.CommitIndex(), raftServer.Term()) s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term())
// add peer stats // add peer stats
if c.Name != r.Name() { if c.Name != r.Name() {
@ -248,12 +261,13 @@ func (c *RemoveCommand) CommandName() string {
} }
// Remove a server from the cluster // Remove a server from the cluster
func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
// remove machine in etcd storage // remove machine in etcd storage
key := path.Join("_etcd/machines", c.Name) key := path.Join("_etcd/machines", c.Name)
_, err := etcdStore.Delete(key, false, raftServer.CommitIndex(), raftServer.Term()) _, err := s.Delete(key, false, server.CommitIndex(), server.Term())
// delete from stats // delete from stats
delete(r.followersStats.Followers, c.Name) delete(r.followersStats.Followers, c.Name)
@ -262,21 +276,21 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
} }
// remove peer in raft // remove peer in raft
err = raftServer.RemovePeer(c.Name) err = server.RemovePeer(c.Name)
if err != nil { if err != nil {
return []byte{0}, err return []byte{0}, err
} }
if c.Name == raftServer.Name() { if c.Name == server.Name() {
// the removed node is this node // the removed node is this node
// if the node is not replaying the previous logs // if the node is not replaying the previous logs
// and the node has sent out a join request in this // and the node has sent out a join request in this
// start. It is sure that this node received a new remove // start. It is sure that this node received a new remove
// command and need to be removed // command and need to be removed
if raftServer.CommitIndex() > r.joinIndex && r.joinIndex != 0 { if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
debugf("server [%s] is removed", raftServer.Name()) debugf("server [%s] is removed", server.Name())
os.Exit(0) os.Exit(0)
} else { } else {
// else ignore remove // else ignore remove
@ -285,7 +299,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
} }
b := make([]byte, 8) b := make([]byte, 8)
binary.PutUvarint(b, raftServer.CommitIndex()) binary.PutUvarint(b, server.CommitIndex())
return b, err return b, err
} }

View File

@ -186,6 +186,11 @@ func (s *Server) Context() interface{} {
return s.context return s.context
} }
// Retrieves the state machine passed into the constructor.
func (s *Server) StateMachine() StateMachine {
return s.stateMachine
}
// Retrieves the log path for the server. // Retrieves the log path for the server.
func (s *Server) LogPath() string { func (s *Server) LogPath() string {
return path.Join(s.path, "log") return path.Join(s.path, "log")