Compare commits

..

26 Commits

Author SHA1 Message Date
4d728cc8c4 *: bump to v2.0.3 2015-02-13 15:27:24 -08:00
f7998bb2db Merge pull request #2304 from xiang90/fix_discovery_validation
etcdserver: validate discovery cluster
2015-02-13 14:41:09 -08:00
cfa7ab6074 etcdserver: validate discovery cluster 2015-02-13 14:32:24 -08:00
b59390c9c3 Merge pull request #2293 from barakmich/etcd_underscore
migrate: stop deleting _etcd
2015-02-13 17:10:14 -05:00
fdebf2b109 fix parent references 2015-02-13 16:54:15 -05:00
e9f4be498d migrate: decrease memory usage (only duplicate machines) 2015-02-13 15:26:54 -05:00
6d9d7b4497 Merge pull request #2302 from xiang90/fix_travis
integration: wait for slow travis
2015-02-13 11:49:37 -08:00
163ea3f5c5 integration: wait for slow travis 2015-02-13 11:41:03 -08:00
ea1e54b2a1 Merge pull request #2291 from ArtfulCoder/master
Added go build flag '-installsuffix cgo' to create a static library for etcd and etcdctl
2015-02-13 11:23:03 -08:00
b31109cfd7 Merge pull request #2290 from xiang90/fix_transport
etcdserver: recover transport when recovering from a snapshot
2015-02-13 10:23:29 -08:00
7a909c3950 Merge pull request #2282 from matishsiao/patch-1
add etcd-console tool to tools list
2015-02-13 10:20:31 -08:00
c16cc3a6a3 etcdserver: recover transport when recovering from a snapshot 2015-02-13 10:16:28 -08:00
d7840b75c3 Merge pull request #2301 from xiang90/fix_snap
integration: fix test
2015-02-13 10:03:45 -08:00
aed2c82e44 integration: fix test 2015-02-13 10:02:42 -08:00
39ee85470f Merge pull request #2300 from xiang90/fix_snap
etcdserver: fix snapshot
2015-02-13 09:56:19 -08:00
fbc4c8efb5 etcdserver: fix snapshot 2015-02-13 09:54:25 -08:00
12999ba083 Merge pull request #2298 from barakmich/issue2295
etcdserver: Unmask the snapshotter. Fixes #2295
2015-02-13 09:38:58 -08:00
a0e3bc9cbd etcdserver: Unmask the snapshotter. Fixes #2295 2015-02-13 11:56:00 -05:00
b06e43b803 Merge pull request #2289 from fabxc/feature/graceful_shutdown
main: shutdown gracefully.
2015-02-13 07:34:07 -08:00
8bf795dc3c etcdmain/osutil: shutdown gracefully, interrupt handling
The functionality in pkg/osutil ensures that all interrupt handlers finish
and the process kills itself with the proper signal.
Test for interrupt handling added.
The server shutsdown gracefully by stopping on interrupt (Issue #2277.)
2015-02-13 10:28:53 +01:00
02c52f175f migrate: stop deleting etcd 2015-02-12 19:35:33 -05:00
daf1a913bb Merge pull request #2287 from Amit-PivotalLabs/master
rafthttp/transport.go: Fix nil pointer dereference in RemovePeer
2015-02-12 14:49:12 -08:00
317e57a8a8 rafthttp: Panic informatively when removing unknown peer ID 2015-02-12 14:43:44 -08:00
5c0d3889f8 Added go build flag '-installsuffix cgo' to create a static library. This is needed when go 1.4 is used to build. 2015-02-12 14:08:02 -08:00
a71184424a *: bump to v2.0.2+git 2015-02-12 11:41:48 -08:00
fd90ec6c26 add etcd-console tool to tools list
i add etcd-console tool to tools list for reference
2015-02-11 10:43:21 +08:00
14 changed files with 282 additions and 41 deletions

View File

@ -8,6 +8,7 @@
- [etcd-fs](https://github.com/xetorthio/etcd-fs) - FUSE filesystem for etcd
- [etcd-browser](https://github.com/henszey/etcd-browser) - A web-based key/value editor for etcd using AngularJS
- [etcd-lock](https://github.com/datawisesystems/etcd-lock) - A lock implementation for etcd
- [etcd-console](https://github.com/matishsiao/etcd-console) - A web-base key/value editor for etcd using PHP
**Go libraries**

4
build
View File

@ -12,7 +12,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
eval $(go env)
# Static compilation is useful when etcd is run in a container
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/osutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy"
@ -73,7 +74,10 @@ func Main() {
}
}
osutil.HandleInterrupts()
<-stopped
osutil.Exit(0)
}
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
@ -160,6 +164,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
return nil, err
}
s.Start()
osutil.RegisterInterruptHandler(s.Stop)
if cfg.corsInfo.String() != "" {
log.Printf("etcd: cors = %s", cfg.corsInfo)

View File

@ -346,6 +346,20 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.members[id].RaftAttributes = raftAttr
}
// Validate ensures that there is no identical urls in the cluster peer list
func (c *Cluster) Validate() error {
urlMap := make(map[string]bool)
for _, m := range c.Members() {
for _, url := range m.PeerURLs {
if urlMap[url] {
return fmt.Errorf("duplicate url %v in cluster config", url)
}
urlMap[url] = true
}
}
return nil
}
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool)

View File

@ -62,15 +62,8 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
}
// No identical IPs in the cluster peer list
urlMap := make(map[string]bool)
for _, m := range c.Cluster.Members() {
for _, url := range m.PeerURLs {
if urlMap[url] {
return fmt.Errorf("duplicate url %v in cluster config", url)
}
urlMap[url] = true
}
if err := c.Cluster.Validate(); err != nil {
return err
}
// Advertised peer URLs must match those in the cluster peer list

View File

@ -144,7 +144,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var ss *snap.Snapshotter
walVersion, err := wal.DetectVersion(cfg.DataDir)
if err != nil {
@ -154,6 +153,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
}
haveWAL := walVersion != wal.WALNotExist
ss := snap.New(cfg.SnapDir())
switch {
case !haveWAL && !cfg.NewCluster:
@ -185,6 +185,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
return nil, err
}
if cfg.Cluster.Validate() != nil {
return nil, fmt.Errorf("bad discovery cluster: %v", err)
}
}
cfg.Cluster.SetStore(st)
cfg.PrintWithInitial()
@ -206,7 +209,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.ShouldDiscover() {
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
ss := snap.New(cfg.SnapDir())
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
@ -393,6 +395,16 @@ func (s *EtcdServer) run() {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index
confState = rd.Snapshot.Metadata.ConfState
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)

View File

@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}

View File

@ -547,6 +547,24 @@ func (m *member) Launch() error {
return nil
}
func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()})
kapi := client.NewKeysAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := kapi.Get(ctx, "/")
if err != nil {
time.Sleep(tickDuration)
continue
}
cancel()
break
}
for m.s.Leader() == 0 {
time.Sleep(tickDuration)
}
}
func (m *member) URL() string { return m.ClientURLs[0].String() }
func (m *member) Pause() {

View File

@ -15,9 +15,14 @@
package integration
import (
"fmt"
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/client"
)
func TestPauseMember(t *testing.T) {
@ -74,3 +79,44 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
t.Errorf("unexpect successful launch")
}
}
func TestSnapshotAndRestartMember(t *testing.T) {
defer afterTest(t)
m := mustNewMember(t, "snapAndRestartTest")
m.SnapCount = 100
m.Launch()
defer m.Terminate(t)
m.WaitOK(t)
resps := make([]*client.Response, 120)
var err error
for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()})
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i)
resps[i], err = kapi.Create(ctx, "/"+key, "bar", -1)
if err != nil {
t.Fatalf("#%d: create on %s error: %v", i, m.URL(), err)
}
cancel()
}
m.Stop(t)
m.Restart(t)
for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()})
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i)
resp, err := kapi.Get(ctx, "/"+key)
if err != nil {
t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err)
}
cancel()
if !reflect.DeepEqual(resp.Node, resps[i].Node) {
t.Errorf("#%d: node = %v, want %v", i, resp.Node, resps[i].Node)
}
}
}

View File

@ -63,6 +63,24 @@ type node struct {
Children map[string]*node // for directory
}
func deepCopyNode(n *node, parent *node) *node {
out := &node{
Path: n.Path,
CreatedIndex: n.CreatedIndex,
ModifiedIndex: n.ModifiedIndex,
Parent: parent,
ExpireTime: n.ExpireTime,
ACL: n.ACL,
Value: n.Value,
Children: make(map[string]*node),
}
for k, v := range n.Children {
out.Children[k] = deepCopyNode(v, out)
}
return out
}
func replacePathNames(n *node, s1, s2 string) {
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
for _, c := range n.Children {
@ -87,9 +105,23 @@ func pullNodesFromEtcd(n *node) map[string]uint64 {
return out
}
func fixEtcd(n *node) {
n.Path = "/0"
machines := n.Children["machines"]
func fixEtcd(etcdref *node) *node {
n := &node{
Path: "/0",
CreatedIndex: etcdref.CreatedIndex,
ModifiedIndex: etcdref.ModifiedIndex,
ExpireTime: etcdref.ExpireTime,
ACL: etcdref.ACL,
Children: make(map[string]*node),
}
var machines *node
if machineOrig, ok := etcdref.Children["machines"]; ok {
machines = deepCopyNode(machineOrig, n)
}
if machines == nil {
return n
}
n.Children["members"] = &node{
Path: "/0/members",
CreatedIndex: machines.CreatedIndex,
@ -97,6 +129,7 @@ func fixEtcd(n *node) {
ExpireTime: machines.ExpireTime,
ACL: machines.ACL,
Children: make(map[string]*node),
Parent: n,
}
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
@ -121,29 +154,32 @@ func fixEtcd(n *node) {
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Children: map[string]*node{
"attributes": &node{
Path: path.Join("/0/members", m.ID.String(), "attributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(attrBytes),
},
"raftAttributes": &node{
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(raftBytes),
},
},
Children: make(map[string]*node),
Parent: n.Children["members"],
}
attrs := &node{
Path: path.Join("/0/members", m.ID.String(), "attributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(attrBytes),
Parent: newNode,
}
newNode.Children["attributes"] = attrs
raftAttrs := &node{
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(raftBytes),
Parent: newNode,
}
newNode.Children["raftAttributes"] = raftAttrs
n.Children["members"].Children[m.ID.String()] = newNode
}
delete(n.Children, "machines")
return n
}
func mangleRoot(n *node) *node {
@ -157,10 +193,10 @@ func mangleRoot(n *node) *node {
}
newRoot.Children["1"] = n
etcd := n.Children["_etcd"]
delete(n.Children, "_etcd")
replacePathNames(n, "/", "/1/")
fixEtcd(etcd)
newRoot.Children["0"] = etcd
newZero := fixEtcd(etcd)
newZero.Parent = newRoot
newRoot.Children["0"] = newZero
return newRoot
}

View File

@ -15,8 +15,12 @@
package osutil
import (
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)
func Unsetenv(key string) error {
@ -33,3 +37,53 @@ func Unsetenv(key string) error {
}
return nil
}
// InterruptHandler is a function that is called on receiving a
// SIGTERM or SIGINT signal.
type InterruptHandler func()
var (
interruptRegisterMu, interruptExitMu sync.Mutex
// interruptHandlers holds all registered InterruptHandlers in order
// they will be executed.
interruptHandlers = []InterruptHandler{}
)
// RegisterInterruptHandler registers a new InterruptHandler. Handlers registered
// after interrupt handing was initiated will not be executed.
func RegisterInterruptHandler(h InterruptHandler) {
interruptRegisterMu.Lock()
defer interruptRegisterMu.Unlock()
interruptHandlers = append(interruptHandlers, h)
}
// HandleInterrupts calls the handler functions on receiving a SIGINT or SIGTERM.
func HandleInterrupts() {
notifier := make(chan os.Signal, 1)
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-notifier
interruptRegisterMu.Lock()
ihs := make([]InterruptHandler, len(interruptHandlers))
copy(ihs, interruptHandlers)
interruptRegisterMu.Unlock()
interruptExitMu.Lock()
log.Printf("received %v signal, shutting down", sig)
for _, h := range ihs {
h()
}
signal.Stop(notifier)
syscall.Kill(syscall.Getpid(), sig.(syscall.Signal))
}()
}
// Exit relays to os.Exit if no interrupt handlers are running, blocks otherwise.
func Exit(code int) {
interruptExitMu.Lock()
os.Exit(code)
}

View File

@ -16,8 +16,11 @@ package osutil
import (
"os"
"os/signal"
"reflect"
"syscall"
"testing"
"time"
)
func TestUnsetenv(t *testing.T) {
@ -43,3 +46,43 @@ func TestUnsetenv(t *testing.T) {
}
}
}
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
select {
case s := <-c:
if s != sig {
t.Fatalf("signal was %v, want %v", s, sig)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for %v", sig)
}
}
func TestHandleInterrupts(t *testing.T) {
for _, sig := range []syscall.Signal{syscall.SIGINT, syscall.SIGTERM} {
n := 1
RegisterInterruptHandler(func() { n++ })
RegisterInterruptHandler(func() { n *= 2 })
c := make(chan os.Signal, 2)
signal.Notify(c, sig)
HandleInterrupts()
syscall.Kill(syscall.Getpid(), sig)
// we should receive the signal once from our own kill and
// a second time from HandleInterrupts
waitSig(t, c, sig)
waitSig(t, c, sig)
if n == 3 {
t.Fatalf("interrupt handlers were called in wrong order")
}
if n != 4 {
t.Fatalf("interrupt handlers were not called properly")
}
// reset interrupt handlers
interruptHandlers = interruptHandlers[:0]
interruptExitMu.Unlock()
}
}

View File

@ -37,6 +37,7 @@ type Transporter interface {
Send(m []raftpb.Message)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
UpdatePeer(id types.ID, urls []string)
Stop()
}
@ -132,7 +133,24 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
func (t *transport) RemovePeer(id types.ID) {
t.mu.Lock()
defer t.mu.Unlock()
t.peers[id].Stop()
t.removePeer(id)
}
func (t *transport) RemoveAllPeers() {
t.mu.Lock()
defer t.mu.Unlock()
for id, _ := range t.peers {
t.removePeer(id)
}
}
// the caller of this function must have the peers mutex.
func (t *transport) removePeer(id types.ID) {
if peer, ok := t.peers[id]; ok {
peer.Stop()
} else {
log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id)
}
delete(t.peers, id)
delete(t.leaderStats.Followers, id.String())
}

View File

@ -15,6 +15,6 @@
package version
var (
Version = "2.0.2"
Version = "2.0.3"
InternalVersion = "2"
)