Compare commits

...

15 Commits

Author SHA1 Message Date
d6523fe463 bump to v0.4.7 2015-02-10 15:59:33 -08:00
c25127a699 Merge pull request #2262 from yichengq/047
server: forbid /v2/stats/leader on follower
2015-02-09 22:29:48 -08:00
9f031e6218 server: forbid /v2/stats/leader on follower 2015-02-09 14:50:34 -08:00
e55724e959 Merge pull request #2260 from yichengq/047
server: refresh commit index when someone rejoins
2015-02-09 14:35:00 -08:00
29af192e3d server: refresh commit index when someone rejoins
Update commit index when rejoin happens, because 2.0 doesn't accept
more than one uncommitted config entry.
2015-02-09 14:28:30 -08:00
2fc79912c2 Merge pull request #2194 from yichengq/o3
server: standby exits when detecting v2 is running
2015-01-30 09:32:19 -08:00
ebb8d781b5 server: standby exits when detecting v2 is running 2015-01-29 16:26:47 -08:00
2e30b3c17f Merge pull request #2130 from xiang90/047-version
server: add internal version
2015-01-22 15:23:38 -08:00
9a2d82854e server: add internal version 2015-01-22 15:23:15 -08:00
b077dcf6c4 Merge pull request #2125 from xiang90/047-handler
next-version-handler
2015-01-22 11:40:54 -08:00
2b572cb6e8 Merge pull request #2126 from yichengq/o1
etcd: register usable versions when bootstrap
2015-01-22 11:32:23 -08:00
f36d55f062 next-version-handler 2015-01-22 11:20:52 -08:00
9f70568a02 etcd: register usable versions when bootstrap 2015-01-22 11:08:58 -08:00
1ca7d1e064 Merge pull request #2124 from xiang90/047-version
server: add version monitoring
2015-01-22 10:38:27 -08:00
4f1f003d04 server: add version monitoring 2015-01-22 10:23:15 -08:00
10 changed files with 181 additions and 11 deletions

View File

@ -1,6 +1,6 @@
# etcd # etcd
README version 0.4.6 README version 0.4.7
A highly-available key value store for shared configuration and service discovery. A highly-available key value store for shared configuration and service discovery.
etcd is inspired by [Apache ZooKeeper][zookeeper] and [doozer][doozer], with a focus on being: etcd is inspired by [Apache ZooKeeper][zookeeper] and [doozer][doozer], with a focus on being:

View File

@ -309,6 +309,7 @@ func (e *Etcd) runServer() {
for { for {
if e.mode == PeerMode { if e.mode == PeerMode {
log.Infof("%v starting in peer mode", e.Config.Name) log.Infof("%v starting in peer mode", e.Config.Name)
go registerAvailableInternalVersions(e.Config.Name, e.Config.Addr, e.Config.EtcdTLSInfo())
// Starting peer server should be followed close by listening on its port // Starting peer server should be followed close by listening on its port
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,

59
etcd/upgrade.go Normal file
View File

@ -0,0 +1,59 @@
package etcd
import (
"fmt"
"os"
"runtime"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
)
var defaultEtcdBinaryDir = "/usr/libexec/etcd/internal_versions/"
func registerAvailableInternalVersions(name string, addr string, tls *server.TLSInfo) {
var c *etcd.Client
if tls.Scheme() == "http" {
c = etcd.NewClient([]string{addr})
} else {
var err error
c, err = etcd.NewTLSClient([]string{addr}, tls.CertFile, tls.KeyFile, tls.CAFile)
if err != nil {
log.Fatalf("client TLS error: %v", err)
}
}
vers, err := getInternalVersions()
if err != nil {
log.Infof("failed to get local etcd versions: %v", err)
return
}
for _, v := range vers {
for {
_, err := c.Set("/_etcd/available-internal-versions/"+v+"/"+name, "ok", 0)
if err == nil {
break
}
time.Sleep(time.Second)
}
}
log.Infof("%s: available_internal_versions %s is registered into key space successfully.", name, vers)
}
func getInternalVersions() ([]string, error) {
if runtime.GOOS != "linux" {
return nil, fmt.Errorf("unmatched os version %v", runtime.GOOS)
}
etcdBinaryDir := os.Getenv("ETCD_BINARY_DIR")
if etcdBinaryDir == "" {
etcdBinaryDir = defaultEtcdBinaryDir
}
dir, err := os.Open(etcdBinaryDir)
if err != nil {
return nil, err
}
defer dir.Close()
return dir.Readdirnames(-1)
}

View File

@ -46,6 +46,7 @@ func (c *JoinCommand) NodeName() string {
// applyJoin attempts to join a machine to the cluster. // applyJoin attempts to join a machine to the cluster.
func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
ps, _ := context.Server().Context().(*PeerServer) ps, _ := context.Server().Context().(*PeerServer)
ps.raftServer.FlushCommitIndex()
commitIndex := context.CommitIndex() commitIndex := context.CommitIndex()
// Make sure we're not getting a cached value from the registry. // Make sure we're not getting a cached value from the registry.

View File

@ -6,6 +6,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -292,6 +293,7 @@ func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
s.startRoutine(s.monitorTimeoutThreshold) s.startRoutine(s.monitorTimeoutThreshold)
s.startRoutine(s.monitorActiveSize) s.startRoutine(s.monitorActiveSize)
s.startRoutine(s.monitorPeerActivity) s.startRoutine(s.monitorPeerActivity)
s.startRoutine(s.monitorVersion)
// open the snapshot // open the snapshot
if snapshot { if snapshot {
@ -370,6 +372,7 @@ func (s *PeerServer) HTTPHandler() http.Handler {
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE") router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
router.HandleFunc("/v2/admin/next-internal-version", s.NextInternalVersionHandler).Methods("GET")
return router return router
} }
@ -895,3 +898,30 @@ func (s *PeerServer) monitorPeerActivity() {
} }
} }
} }
func (s *PeerServer) monitorVersion() {
for {
select {
case <-s.closeChan:
return
case <-time.After(time.Second):
}
resp, err := s.store.Get("/_etcd/next-internal-version", false, false)
if err != nil {
continue
}
// only support upgrading to etcd2
if *resp.Node.Value == "2" {
log.Infof("%s: detected next internal version 2, exit after 10 seconds.", s.Config.Name)
} else {
log.Infof("%s: detected invaild next internal version %s", s.Config.Name, *resp.Node.Value)
continue
}
time.Sleep(10 * time.Second)
// be nice to raft. try not to corrupt log file.
go s.raftServer.Stop()
time.Sleep(time.Second)
os.Exit(0)
}
}

View File

@ -3,6 +3,7 @@ package server
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"path"
"strconv" "strconv"
"time" "time"
@ -309,6 +310,48 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (ps *PeerServer) NextInternalVersionHandler(w http.ResponseWriter, req *http.Request) {
for i := 0; i < 50; i++ {
if ps.raftServer.State() != raft.Leader {
l := ps.raftServer.Leader()
if l == "" {
time.Sleep(5 * time.Second)
continue
}
url, _ := ps.registry.PeerURL(l)
uhttp.Redirect(url, w, req)
return
}
resp, err := ps.store.Get("/_etcd/available-internal-versions/2", true, true)
if err != nil {
time.Sleep(5 * time.Second)
continue
}
available := make(map[string]bool)
for _, n := range resp.Node.Nodes {
available[path.Base(n.Key)] = true
}
notfound := false
for _, n := range ps.registry.Names() {
if !available[n] {
notfound = true
break
}
}
if notfound {
time.Sleep(5 * time.Second)
continue
}
c := ps.store.CommandFactory().CreateSetCommand("/_etcd/next-internal-version", false, "2", store.Permanent)
_, err = ps.raftServer.Do(c)
if err == nil {
return
}
}
w.WriteHeader(http.StatusServiceUnavailable)
}
// machineMessage represents information about a peer or standby in the registry. // machineMessage represents information about a peer or standby in the registry.
type machineMessage struct { type machineMessage struct {
Name string `json:"name"` Name string `json:"name"`

View File

@ -1,3 +0,0 @@
package server
const ReleaseVersion = "0.4.6"

View File

@ -285,7 +285,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
// Handler to return the current version of etcd. // Handler to return the current version of etcd.
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "etcd %s", ReleaseVersion) fmt.Fprintf(w, `{"releaseVersion":"%s","internalVersion":"%s"}`, ReleaseVersion, InternalVersion)
return nil return nil
} }
@ -324,12 +324,8 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
return nil return nil
} }
leader := s.peerServer.RaftServer().Leader() w.WriteHeader(http.StatusForbidden)
if leader == "" { w.Write([]byte("not current leader"))
return etcdErr.NewError(300, "", s.Store().Index())
}
hostname, _ := s.registry.ClientURL(leader)
uhttp.Redirect(hostname, w, req)
return nil return nil
} }

View File

@ -187,6 +187,14 @@ func (s *StandbyServer) monitorCluster() {
case <-ticker.C: case <-ticker.C:
} }
ok, err := s.checkMemberInternalVersionIsV2()
if err != nil {
log.Warnf("fail checking internal version(%v): %v", s.ClusterURLs(), err)
} else if ok {
log.Infof("Detect the cluster has been upgraded to v2. Exit now.")
os.Exit(0)
}
if err := s.syncCluster(nil); err != nil { if err := s.syncCluster(nil); err != nil {
log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err) log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
continue continue
@ -216,6 +224,39 @@ func (s *StandbyServer) monitorCluster() {
} }
} }
func (s *StandbyServer) checkMemberInternalVersionIsV2() (bool, error) {
c := &http.Client{Transport: s.client.Client.Transport}
for _, memb := range s.Cluster {
url := memb.ClientURL
resp, err := c.Get(url + "/version")
if err != nil {
log.Debugf("failed to get /version from %s", url)
continue
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debugf("failed to read body from %s", url)
continue
}
var m map[string]string
err = json.Unmarshal(b, &m)
if err != nil {
log.Debugf("failed to unmarshal body %s from %s", b, url)
continue
}
switch m["internalVersion"] {
case "1":
return false, nil
case "2":
return true, nil
default:
log.Warnf("unrecognized internal version %s from %s", m["internalVersion"], url)
}
}
return false, fmt.Errorf("failed to get version")
}
func (s *StandbyServer) syncCluster(peerURLs []string) error { func (s *StandbyServer) syncCluster(peerURLs []string) error {
peerURLs = append(s.ClusterURLs(), peerURLs...) peerURLs = append(s.ClusterURLs(), peerURLs...)

View File

@ -1,3 +1,5 @@
package server package server
const ReleaseVersion = "0.4.7"
const InternalVersion = "1"
const Version = "v2" const Version = "v2"