Compare commits
36 Commits
Author | SHA1 | Date | |
---|---|---|---|
4d728cc8c4 | |||
f7998bb2db | |||
cfa7ab6074 | |||
b59390c9c3 | |||
fdebf2b109 | |||
e9f4be498d | |||
6d9d7b4497 | |||
163ea3f5c5 | |||
ea1e54b2a1 | |||
b31109cfd7 | |||
7a909c3950 | |||
c16cc3a6a3 | |||
d7840b75c3 | |||
aed2c82e44 | |||
39ee85470f | |||
fbc4c8efb5 | |||
12999ba083 | |||
a0e3bc9cbd | |||
b06e43b803 | |||
8bf795dc3c | |||
02c52f175f | |||
daf1a913bb | |||
317e57a8a8 | |||
5c0d3889f8 | |||
a71184424a | |||
409daceb73 | |||
c6cc276ef0 | |||
cd50f0e058 | |||
fade9b6065 | |||
590205b8c0 | |||
163f0f09f6 | |||
20497f1f85 | |||
4a0887ef7a | |||
161b1d2e2e | |||
71bed48916 | |||
fd90ec6c26 |
@ -8,6 +8,7 @@
|
||||
- [etcd-fs](https://github.com/xetorthio/etcd-fs) - FUSE filesystem for etcd
|
||||
- [etcd-browser](https://github.com/henszey/etcd-browser) - A web-based key/value editor for etcd using AngularJS
|
||||
- [etcd-lock](https://github.com/datawisesystems/etcd-lock) - A lock implementation for etcd
|
||||
- [etcd-console](https://github.com/matishsiao/etcd-console) - A web-base key/value editor for etcd using PHP
|
||||
|
||||
**Go libraries**
|
||||
|
||||
|
4
build
4
build
@ -12,7 +12,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
||||
eval $(go env)
|
||||
|
||||
# Static compilation is useful when etcd is run in a container
|
||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
|
||||
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
@ -74,7 +74,10 @@ func Main() {
|
||||
}
|
||||
}
|
||||
|
||||
osutil.HandleInterrupts()
|
||||
|
||||
<-stopped
|
||||
osutil.Exit(0)
|
||||
}
|
||||
|
||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
@ -88,13 +91,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
if err := makeMemberDir(cfg.dir); err != nil {
|
||||
return nil, fmt.Errorf("cannot use /member sub-directory: %v", err)
|
||||
}
|
||||
membdir := path.Join(cfg.dir, "member")
|
||||
if err := fileutil.IsDirWriteable(membdir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
@ -149,7 +145,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Name: cfg.name,
|
||||
ClientURLs: cfg.acurls,
|
||||
PeerURLs: cfg.apurls,
|
||||
DataDir: membdir,
|
||||
DataDir: cfg.dir,
|
||||
SnapCount: cfg.snapCount,
|
||||
MaxSnapFiles: cfg.maxSnapFiles,
|
||||
MaxWALFiles: cfg.maxWalFiles,
|
||||
@ -168,6 +164,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
s.Start()
|
||||
osutil.RegisterInterruptHandler(s.Stop)
|
||||
|
||||
if cfg.corsInfo.String() != "" {
|
||||
log.Printf("etcd: cors = %s", cfg.corsInfo)
|
||||
@ -336,42 +333,6 @@ func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
||||
return cls, err
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
v1Files := types.NewUnsafeSet("conf", "log", "snapshot")
|
||||
v2Files := types.NewUnsafeSet("wal", "snap")
|
||||
names, err := fileutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, name := range names {
|
||||
switch {
|
||||
case v1Files.Contains(name):
|
||||
// Link it to the subdir and keep the v1 file at the original
|
||||
// location, so v0.4 etcd can still bootstrap if the upgrade
|
||||
// failed.
|
||||
if err := os.Symlink(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
case v2Files.Contains(name):
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func genClusterString(name string, urls types.URLs) string {
|
||||
addrs := make([]string, 0)
|
||||
for _, u := range urls {
|
||||
|
@ -346,6 +346,20 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
||||
c.members[id].RaftAttributes = raftAttr
|
||||
}
|
||||
|
||||
// Validate ensures that there is no identical urls in the cluster peer list
|
||||
func (c *Cluster) Validate() error {
|
||||
urlMap := make(map[string]bool)
|
||||
for _, m := range c.Members() {
|
||||
for _, url := range m.PeerURLs {
|
||||
if urlMap[url] {
|
||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
||||
}
|
||||
urlMap[url] = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
members := make(map[types.ID]*Member)
|
||||
removed := make(map[types.ID]bool)
|
||||
|
109
etcdserver/cluster_util.go
Normal file
109
etcdserver/cluster_util.go
Normal file
@ -0,0 +1,109 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
// isMemberBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
|
||||
us := getOtherPeerURLs(cl, member)
|
||||
rcl, err := getClusterFromPeers(us, false, tr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
id := cl.MemberByName(member).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return false
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetClusterFromPeers takes a set of URLs representing etcd peers, and
|
||||
// attempts to construct a Cluster by accessing the members endpoint on one of
|
||||
// these URLs. The first URL to provide a response is used. If no URLs provide
|
||||
// a response, or a Cluster cannot be successfully created from a received
|
||||
// response, an error is returned.
|
||||
func GetClusterFromPeers(urls []string, tr *http.Transport) (*Cluster, error) {
|
||||
return getClusterFromPeers(urls, true, tr)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromPeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Second,
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := cc.Get(u + "/members")
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var membs []*Member
|
||||
if err := json.Unmarshal(b, &membs); err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
// getOtherPeerURLs returns peer urls of other members in the cluster. The
|
||||
// returned list is sorted in ascending lexicographical order.
|
||||
func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
||||
us := make([]string, 0)
|
||||
for _, m := range cl.Members() {
|
||||
if m.Name == self {
|
||||
continue
|
||||
}
|
||||
us = append(us, m.PeerURLs...)
|
||||
}
|
||||
sort.Strings(us)
|
||||
return us
|
||||
}
|
@ -62,15 +62,8 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
|
||||
}
|
||||
|
||||
// No identical IPs in the cluster peer list
|
||||
urlMap := make(map[string]bool)
|
||||
for _, m := range c.Cluster.Members() {
|
||||
for _, url := range m.PeerURLs {
|
||||
if urlMap[url] {
|
||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
||||
}
|
||||
urlMap[url] = true
|
||||
}
|
||||
if err := c.Cluster.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Advertised peer URLs must match those in the cluster peer list
|
||||
@ -83,9 +76,11 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
|
||||
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.MemberDir(), "wal") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
|
||||
@ -99,6 +94,7 @@ func (c *ServerConfig) print(initial bool) {
|
||||
log.Println("etcdserver: force new cluster")
|
||||
}
|
||||
log.Printf("etcdserver: data dir = %s", c.DataDir)
|
||||
log.Printf("etcdserver: member dir = %s", c.MemberDir())
|
||||
log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
|
||||
log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
|
||||
log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
|
||||
|
@ -129,8 +129,8 @@ func TestBootstrapConfigVerify(t *testing.T) {
|
||||
|
||||
func TestSnapDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/snap",
|
||||
"/": "/member/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/member/snap",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
@ -144,8 +144,8 @@ func TestSnapDir(t *testing.T) {
|
||||
|
||||
func TestWALDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/wal",
|
||||
"/": "/member/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/member/wal",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
|
@ -18,13 +18,11 @@ import (
|
||||
"encoding/json"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -146,6 +144,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
var n raft.Node
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
|
||||
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -154,8 +153,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
|
||||
}
|
||||
haveWAL := walVersion != wal.WALNotExist
|
||||
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
us := getOtherPeerURLs(cfg.Cluster, cfg.Name)
|
||||
@ -175,7 +174,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, err
|
||||
}
|
||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
||||
if isBootstrapped(cfg) {
|
||||
if isMemberBootstrapped(cfg.Cluster, cfg.Name, cfg.Transport) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
@ -186,15 +185,25 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cfg.Cluster.Validate() != nil {
|
||||
return nil, fmt.Errorf("bad discovery cluster: %v", err)
|
||||
}
|
||||
}
|
||||
cfg.Cluster.SetStore(st)
|
||||
cfg.PrintWithInitial()
|
||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
if walVersion != wal.WALv0_5 {
|
||||
if err := upgradeWAL(cfg, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Run the migrations.
|
||||
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
|
||||
if cfg.ShouldDiscover() {
|
||||
@ -386,6 +395,16 @@ func (s *EtcdServer) run() {
|
||||
log.Panicf("recovery store error: %v", err)
|
||||
}
|
||||
s.Cluster.Recover()
|
||||
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
|
||||
appliedi = rd.Snapshot.Metadata.Index
|
||||
confState = rd.Snapshot.Metadata.ConfState
|
||||
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
|
||||
@ -821,88 +840,3 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
|
||||
func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
|
||||
|
||||
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
|
||||
|
||||
// isBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isBootstrapped(cfg *ServerConfig) bool {
|
||||
cl := cfg.Cluster
|
||||
member := cfg.Name
|
||||
|
||||
us := getOtherPeerURLs(cl, member)
|
||||
rcl, err := getClusterFromPeers(us, false, cfg.Transport)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
id := cl.MemberByName(member).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return false
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetClusterFromPeers takes a set of URLs representing etcd peers, and
|
||||
// attempts to construct a Cluster by accessing the members endpoint on one of
|
||||
// these URLs. The first URL to provide a response is used. If no URLs provide
|
||||
// a response, or a Cluster cannot be successfully created from a received
|
||||
// response, an error is returned.
|
||||
func GetClusterFromPeers(urls []string, tr *http.Transport) (*Cluster, error) {
|
||||
return getClusterFromPeers(urls, true, tr)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromPeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Second,
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := cc.Get(u + "/members")
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var membs []*Member
|
||||
if err := json.Unmarshal(b, &membs); err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
// getOtherPeerURLs returns peer urls of other members in the cluster. The
|
||||
// returned list is sorted in ascending lexicographical order.
|
||||
func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
||||
us := make([]string, 0)
|
||||
for _, m := range cl.Members() {
|
||||
if m.Name == self {
|
||||
continue
|
||||
}
|
||||
us = append(us, m.PeerURLs...)
|
||||
}
|
||||
sort.Strings(us)
|
||||
return us
|
||||
}
|
||||
|
@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
|
@ -16,6 +16,8 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
@ -91,14 +93,47 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
if ver == wal.WALv0_4 {
|
||||
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
|
||||
switch ver {
|
||||
case wal.WALv0_4:
|
||||
log.Print("etcdserver: converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
|
||||
err := migrate.Migrate4To2(baseDataDir, name)
|
||||
if err != nil {
|
||||
log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0:
|
||||
err := makeMemberDir(baseDataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0_1:
|
||||
fallthrough
|
||||
default:
|
||||
log.Printf("datadir is valid for the 2.0.1 format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
names := []string{"snap", "wal"}
|
||||
for _, name := range names {
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -547,6 +547,24 @@ func (m *member) Launch() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) WaitOK(t *testing.T) {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
_, err := kapi.Get(ctx, "/")
|
||||
if err != nil {
|
||||
time.Sleep(tickDuration)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
for m.s.Leader() == 0 {
|
||||
time.Sleep(tickDuration)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *member) URL() string { return m.ClientURLs[0].String() }
|
||||
|
||||
func (m *member) Pause() {
|
||||
|
@ -15,9 +15,14 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
func TestPauseMember(t *testing.T) {
|
||||
@ -74,3 +79,44 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
||||
t.Errorf("unexpect successful launch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotAndRestartMember(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
m := mustNewMember(t, "snapAndRestartTest")
|
||||
m.SnapCount = 100
|
||||
m.Launch()
|
||||
defer m.Terminate(t)
|
||||
m.WaitOK(t)
|
||||
|
||||
resps := make([]*client.Response, 120)
|
||||
var err error
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
resps[i], err = kapi.Create(ctx, "/"+key, "bar", -1)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: create on %s error: %v", i, m.URL(), err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
m.Stop(t)
|
||||
m.Restart(t)
|
||||
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
resp, err := kapi.Get(ctx, "/"+key)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err)
|
||||
}
|
||||
cancel()
|
||||
|
||||
if !reflect.DeepEqual(resp.Node, resps[i].Node) {
|
||||
t.Errorf("#%d: node = %v, want %v", i, resp.Node, resps[i].Node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -63,6 +63,24 @@ type node struct {
|
||||
Children map[string]*node // for directory
|
||||
}
|
||||
|
||||
func deepCopyNode(n *node, parent *node) *node {
|
||||
out := &node{
|
||||
Path: n.Path,
|
||||
CreatedIndex: n.CreatedIndex,
|
||||
ModifiedIndex: n.ModifiedIndex,
|
||||
Parent: parent,
|
||||
ExpireTime: n.ExpireTime,
|
||||
ACL: n.ACL,
|
||||
Value: n.Value,
|
||||
Children: make(map[string]*node),
|
||||
}
|
||||
for k, v := range n.Children {
|
||||
out.Children[k] = deepCopyNode(v, out)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func replacePathNames(n *node, s1, s2 string) {
|
||||
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
|
||||
for _, c := range n.Children {
|
||||
@ -87,9 +105,23 @@ func pullNodesFromEtcd(n *node) map[string]uint64 {
|
||||
return out
|
||||
}
|
||||
|
||||
func fixEtcd(n *node) {
|
||||
n.Path = "/0"
|
||||
machines := n.Children["machines"]
|
||||
func fixEtcd(etcdref *node) *node {
|
||||
n := &node{
|
||||
Path: "/0",
|
||||
CreatedIndex: etcdref.CreatedIndex,
|
||||
ModifiedIndex: etcdref.ModifiedIndex,
|
||||
ExpireTime: etcdref.ExpireTime,
|
||||
ACL: etcdref.ACL,
|
||||
Children: make(map[string]*node),
|
||||
}
|
||||
|
||||
var machines *node
|
||||
if machineOrig, ok := etcdref.Children["machines"]; ok {
|
||||
machines = deepCopyNode(machineOrig, n)
|
||||
}
|
||||
if machines == nil {
|
||||
return n
|
||||
}
|
||||
n.Children["members"] = &node{
|
||||
Path: "/0/members",
|
||||
CreatedIndex: machines.CreatedIndex,
|
||||
@ -97,6 +129,7 @@ func fixEtcd(n *node) {
|
||||
ExpireTime: machines.ExpireTime,
|
||||
ACL: machines.ACL,
|
||||
Children: make(map[string]*node),
|
||||
Parent: n,
|
||||
}
|
||||
for name, c := range machines.Children {
|
||||
q, err := url.ParseQuery(c.Value)
|
||||
@ -121,29 +154,32 @@ func fixEtcd(n *node) {
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Children: map[string]*node{
|
||||
"attributes": &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(attrBytes),
|
||||
},
|
||||
"raftAttributes": &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(raftBytes),
|
||||
},
|
||||
},
|
||||
Children: make(map[string]*node),
|
||||
Parent: n.Children["members"],
|
||||
}
|
||||
attrs := &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(attrBytes),
|
||||
Parent: newNode,
|
||||
}
|
||||
newNode.Children["attributes"] = attrs
|
||||
raftAttrs := &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(raftBytes),
|
||||
Parent: newNode,
|
||||
}
|
||||
newNode.Children["raftAttributes"] = raftAttrs
|
||||
n.Children["members"].Children[m.ID.String()] = newNode
|
||||
}
|
||||
delete(n.Children, "machines")
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func mangleRoot(n *node) *node {
|
||||
@ -157,10 +193,10 @@ func mangleRoot(n *node) *node {
|
||||
}
|
||||
newRoot.Children["1"] = n
|
||||
etcd := n.Children["_etcd"]
|
||||
delete(n.Children, "_etcd")
|
||||
replacePathNames(n, "/", "/1/")
|
||||
fixEtcd(etcd)
|
||||
newRoot.Children["0"] = etcd
|
||||
newZero := fixEtcd(etcd)
|
||||
newZero.Parent = newRoot
|
||||
newRoot.Children["0"] = newZero
|
||||
return newRoot
|
||||
}
|
||||
|
||||
|
@ -15,8 +15,12 @@
|
||||
package osutil
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func Unsetenv(key string) error {
|
||||
@ -33,3 +37,53 @@ func Unsetenv(key string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// InterruptHandler is a function that is called on receiving a
|
||||
// SIGTERM or SIGINT signal.
|
||||
type InterruptHandler func()
|
||||
|
||||
var (
|
||||
interruptRegisterMu, interruptExitMu sync.Mutex
|
||||
// interruptHandlers holds all registered InterruptHandlers in order
|
||||
// they will be executed.
|
||||
interruptHandlers = []InterruptHandler{}
|
||||
)
|
||||
|
||||
// RegisterInterruptHandler registers a new InterruptHandler. Handlers registered
|
||||
// after interrupt handing was initiated will not be executed.
|
||||
func RegisterInterruptHandler(h InterruptHandler) {
|
||||
interruptRegisterMu.Lock()
|
||||
defer interruptRegisterMu.Unlock()
|
||||
interruptHandlers = append(interruptHandlers, h)
|
||||
}
|
||||
|
||||
// HandleInterrupts calls the handler functions on receiving a SIGINT or SIGTERM.
|
||||
func HandleInterrupts() {
|
||||
notifier := make(chan os.Signal, 1)
|
||||
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
sig := <-notifier
|
||||
|
||||
interruptRegisterMu.Lock()
|
||||
ihs := make([]InterruptHandler, len(interruptHandlers))
|
||||
copy(ihs, interruptHandlers)
|
||||
interruptRegisterMu.Unlock()
|
||||
|
||||
interruptExitMu.Lock()
|
||||
|
||||
log.Printf("received %v signal, shutting down", sig)
|
||||
|
||||
for _, h := range ihs {
|
||||
h()
|
||||
}
|
||||
signal.Stop(notifier)
|
||||
syscall.Kill(syscall.Getpid(), sig.(syscall.Signal))
|
||||
}()
|
||||
}
|
||||
|
||||
// Exit relays to os.Exit if no interrupt handlers are running, blocks otherwise.
|
||||
func Exit(code int) {
|
||||
interruptExitMu.Lock()
|
||||
os.Exit(code)
|
||||
}
|
||||
|
@ -16,8 +16,11 @@ package osutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestUnsetenv(t *testing.T) {
|
||||
@ -43,3 +46,43 @@ func TestUnsetenv(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
|
||||
select {
|
||||
case s := <-c:
|
||||
if s != sig {
|
||||
t.Fatalf("signal was %v, want %v", s, sig)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout waiting for %v", sig)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleInterrupts(t *testing.T) {
|
||||
for _, sig := range []syscall.Signal{syscall.SIGINT, syscall.SIGTERM} {
|
||||
n := 1
|
||||
RegisterInterruptHandler(func() { n++ })
|
||||
RegisterInterruptHandler(func() { n *= 2 })
|
||||
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, sig)
|
||||
|
||||
HandleInterrupts()
|
||||
syscall.Kill(syscall.Getpid(), sig)
|
||||
|
||||
// we should receive the signal once from our own kill and
|
||||
// a second time from HandleInterrupts
|
||||
waitSig(t, c, sig)
|
||||
waitSig(t, c, sig)
|
||||
|
||||
if n == 3 {
|
||||
t.Fatalf("interrupt handlers were called in wrong order")
|
||||
}
|
||||
if n != 4 {
|
||||
t.Fatalf("interrupt handlers were not called properly")
|
||||
}
|
||||
// reset interrupt handlers
|
||||
interruptHandlers = interruptHandlers[:0]
|
||||
interruptExitMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ type Transporter interface {
|
||||
Send(m []raftpb.Message)
|
||||
AddPeer(id types.ID, urls []string)
|
||||
RemovePeer(id types.ID)
|
||||
RemoveAllPeers()
|
||||
UpdatePeer(id types.ID, urls []string)
|
||||
Stop()
|
||||
}
|
||||
@ -132,7 +133,24 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
|
||||
func (t *transport) RemovePeer(id types.ID) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.peers[id].Stop()
|
||||
t.removePeer(id)
|
||||
}
|
||||
|
||||
func (t *transport) RemoveAllPeers() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
for id, _ := range t.peers {
|
||||
t.removePeer(id)
|
||||
}
|
||||
}
|
||||
|
||||
// the caller of this function must have the peers mutex.
|
||||
func (t *transport) removePeer(id types.ID) {
|
||||
if peer, ok := t.peers[id]; ok {
|
||||
peer.Stop()
|
||||
} else {
|
||||
log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id)
|
||||
}
|
||||
delete(t.peers, id)
|
||||
delete(t.leaderStats.Followers, id.String())
|
||||
}
|
||||
|
@ -86,25 +86,25 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
}
|
||||
|
||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
var err error
|
||||
var b []byte
|
||||
|
||||
fpath := path.Join(dir, name)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
renameBroken(fpath)
|
||||
}
|
||||
}()
|
||||
|
||||
b, err = ioutil.ReadFile(fpath)
|
||||
snap, err := Read(fpath)
|
||||
if err != nil {
|
||||
log.Printf("snap: snapshotter cannot read file %v: %v", name, err)
|
||||
renameBroken(fpath)
|
||||
}
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Read reads the snapshot named by snapname and returns the snapshot.
|
||||
func Read(snapname string) (*raftpb.Snapshot, error) {
|
||||
b, err := ioutil.ReadFile(snapname)
|
||||
if err != nil {
|
||||
log.Printf("snap: snapshotter cannot read file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var serializedSnap snappb.Snapshot
|
||||
if err = serializedSnap.Unmarshal(b); err != nil {
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", name, err)
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -115,13 +115,13 @@ func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
|
||||
crc := crc32.Update(0, crcTable, serializedSnap.Data)
|
||||
if crc != serializedSnap.Crc {
|
||||
log.Printf("snap: corrupted snapshot file %v: crc mismatch", name)
|
||||
log.Printf("snap: corrupted snapshot file %v: crc mismatch", snapname)
|
||||
return nil, ErrCRCMismatch
|
||||
}
|
||||
|
||||
var snap raftpb.Snapshot
|
||||
if err = snap.Unmarshal(serializedSnap.Data); err != nil {
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", name, err)
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
return &snap, nil
|
||||
|
@ -32,13 +32,24 @@ import (
|
||||
|
||||
func main() {
|
||||
from := flag.String("data-dir", "", "")
|
||||
snapfile := flag.String("snap-file", "", "The base name of snapshot file to read")
|
||||
flag.Parse()
|
||||
if *from == "" {
|
||||
log.Fatal("Must provide -data-dir flag")
|
||||
}
|
||||
|
||||
ss := snap.New(snapDir(*from))
|
||||
snapshot, err := ss.Load()
|
||||
var (
|
||||
snapshot *raftpb.Snapshot
|
||||
err error
|
||||
)
|
||||
|
||||
if *snapfile == "" {
|
||||
ss := snap.New(snapDir(*from))
|
||||
snapshot, err = ss.Load()
|
||||
} else {
|
||||
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
|
||||
}
|
||||
|
||||
var walsnap walpb.Snapshot
|
||||
switch err {
|
||||
case nil:
|
||||
@ -102,9 +113,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func walDir(dataDir string) string { return path.Join(dataDir, "wal") }
|
||||
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
|
||||
|
||||
func snapDir(dataDir string) string { return path.Join(dataDir, "snap") }
|
||||
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
|
||||
|
||||
func parseWALMetadata(b []byte) (id, cid types.ID) {
|
||||
var metadata etcdserverpb.Metadata
|
||||
|
@ -15,6 +15,6 @@
|
||||
package version
|
||||
|
||||
var (
|
||||
Version = "2.0.1"
|
||||
Version = "2.0.3"
|
||||
InternalVersion = "2"
|
||||
)
|
||||
|
15
wal/util.go
15
wal/util.go
@ -31,7 +31,8 @@ const (
|
||||
WALUnknown WalVersion = "Unknown WAL"
|
||||
WALNotExist WalVersion = "No WAL"
|
||||
WALv0_4 WalVersion = "0.4.x"
|
||||
WALv0_5 WalVersion = "0.5.x"
|
||||
WALv2_0 WalVersion = "2.0.0"
|
||||
WALv2_0_1 WalVersion = "2.0.1"
|
||||
)
|
||||
|
||||
func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
@ -48,10 +49,20 @@ func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
return WALNotExist, nil
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.Contains("member") {
|
||||
ver, err := DetectVersion(path.Join(dirpath, "member"))
|
||||
if ver == WALv2_0 {
|
||||
return WALv2_0_1, nil
|
||||
} else if ver == WALv0_4 {
|
||||
// How in the blazes did it get there?
|
||||
return WALUnknown, nil
|
||||
}
|
||||
return ver, err
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snap", "wal"}) {
|
||||
// .../wal cannot be empty to exist.
|
||||
if Exist(path.Join(dirpath, "wal")) {
|
||||
return WALv0_5, nil
|
||||
return WALv2_0, nil
|
||||
}
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
|
@ -28,7 +28,8 @@ func TestDetectVersion(t *testing.T) {
|
||||
wver WalVersion
|
||||
}{
|
||||
{[]string{}, WALNotExist},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv0_5},
|
||||
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
|
||||
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
|
||||
{[]string{"weird"}, WALUnknown},
|
||||
{[]string{"snap/", "wal/"}, WALUnknown},
|
||||
|
Reference in New Issue
Block a user