refactor remove extra function
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
@ -346,44 +347,48 @@ func (s *PeerServer) monitorSnapshot() {
|
|||||||
|
|
||||||
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
|
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
|
||||||
if s.State() == raft.Leader {
|
if s.State() == raft.Leader {
|
||||||
if response, err := s.Do(c); err != nil {
|
result, err := s.Do(c)
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
if response == nil {
|
|
||||||
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
|
|
||||||
}
|
|
||||||
|
|
||||||
event, ok := response.(*store.Event)
|
|
||||||
if ok {
|
|
||||||
bytes, err := json.Marshal(event)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
|
if result == nil {
|
||||||
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
|
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
|
||||||
w.WriteHeader(http.StatusOK)
|
}
|
||||||
w.Write(bytes)
|
|
||||||
|
|
||||||
|
// response for raft related commands[join/remove]
|
||||||
|
if b, ok := result.([]byte); ok {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write(b)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes, _ := response.([]byte)
|
var b []byte
|
||||||
|
if strings.HasPrefix(req.URL.Path, "/v1") {
|
||||||
|
b, _ = json.Marshal(result.(*store.Event).Response())
|
||||||
|
} else {
|
||||||
|
b, _ = json.Marshal(result.(*store.Event))
|
||||||
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
w.Write(bytes)
|
w.Write(b)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
leader := s.Leader()
|
leader := s.Leader()
|
||||||
// current no leader
|
|
||||||
|
// No leader available.
|
||||||
if leader == "" {
|
if leader == "" {
|
||||||
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
|
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
|
||||||
}
|
}
|
||||||
url, _ := s.registry.PeerURL(leader)
|
|
||||||
|
|
||||||
log.Debugf("Not leader; Current leader: %s; redirect: %s", leader, url)
|
var url string
|
||||||
|
switch c.(type) {
|
||||||
|
case *JoinCommand, *RemoveCommand:
|
||||||
|
url, _ = s.registry.PeerURL(leader)
|
||||||
|
default:
|
||||||
|
url, _ = s.registry.ClientURL(leader)
|
||||||
|
}
|
||||||
redirect(url, w, req)
|
redirect(url, w, req)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -113,7 +113,7 @@ func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Receive Join Request from %s", command.Name)
|
log.Debugf("Receive Join Request from %s", command.Name)
|
||||||
err = s.dispatchRaftCommand(command, w, req)
|
err = s.dispatch(command, w, req)
|
||||||
|
|
||||||
// Return status.
|
// Return status.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -140,7 +140,7 @@ func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request)
|
|||||||
|
|
||||||
log.Debugf("[recv] Remove Request [%s]", command.Name)
|
log.Debugf("[recv] Remove Request [%s]", command.Name)
|
||||||
|
|
||||||
s.dispatchRaftCommand(command, w, req)
|
s.dispatch(command, w, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Response to the name request
|
// Response to the name request
|
||||||
@ -156,7 +156,3 @@ func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Req
|
|||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
w.Write([]byte(PeerVersion))
|
w.Write([]byte(PeerVersion))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error {
|
|
||||||
return s.dispatch(c, w, req)
|
|
||||||
}
|
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -168,51 +167,7 @@ func (s *Server) ListenAndServe() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
|
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
|
||||||
if s.peerServer.State() == raft.Leader {
|
return s.peerServer.dispatch(c, w, req)
|
||||||
event, err := s.peerServer.Do(c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if event == nil {
|
|
||||||
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
|
|
||||||
}
|
|
||||||
|
|
||||||
if b, ok := event.([]byte); ok {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
w.Write(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
var b []byte
|
|
||||||
if strings.HasPrefix(req.URL.Path, "/v1") {
|
|
||||||
b, _ = json.Marshal(event.(*store.Event).Response())
|
|
||||||
} else {
|
|
||||||
b, _ = json.Marshal(event.(*store.Event))
|
|
||||||
}
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
w.Write(b)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
|
|
||||||
} else {
|
|
||||||
leader := s.peerServer.Leader()
|
|
||||||
|
|
||||||
// No leader available.
|
|
||||||
if leader == "" {
|
|
||||||
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
|
|
||||||
}
|
|
||||||
|
|
||||||
var url string
|
|
||||||
switch c.(type) {
|
|
||||||
case *JoinCommand, *RemoveCommand:
|
|
||||||
url, _ = s.registry.PeerURL(leader)
|
|
||||||
default:
|
|
||||||
url, _ = s.registry.ClientURL(leader)
|
|
||||||
}
|
|
||||||
redirect(url, w, req)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets a comma-delimited list of origins that are allowed.
|
// Sets a comma-delimited list of origins that are allowed.
|
||||||
|
Reference in New Issue
Block a user