Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
4d728cc8c4 | |||
f7998bb2db | |||
cfa7ab6074 | |||
b59390c9c3 | |||
fdebf2b109 | |||
e9f4be498d | |||
6d9d7b4497 | |||
163ea3f5c5 | |||
ea1e54b2a1 | |||
b31109cfd7 | |||
7a909c3950 | |||
c16cc3a6a3 | |||
d7840b75c3 | |||
aed2c82e44 | |||
39ee85470f | |||
fbc4c8efb5 | |||
12999ba083 | |||
a0e3bc9cbd | |||
b06e43b803 | |||
8bf795dc3c | |||
02c52f175f | |||
daf1a913bb | |||
317e57a8a8 | |||
5c0d3889f8 | |||
a71184424a | |||
fd90ec6c26 |
@ -8,6 +8,7 @@
|
|||||||
- [etcd-fs](https://github.com/xetorthio/etcd-fs) - FUSE filesystem for etcd
|
- [etcd-fs](https://github.com/xetorthio/etcd-fs) - FUSE filesystem for etcd
|
||||||
- [etcd-browser](https://github.com/henszey/etcd-browser) - A web-based key/value editor for etcd using AngularJS
|
- [etcd-browser](https://github.com/henszey/etcd-browser) - A web-based key/value editor for etcd using AngularJS
|
||||||
- [etcd-lock](https://github.com/datawisesystems/etcd-lock) - A lock implementation for etcd
|
- [etcd-lock](https://github.com/datawisesystems/etcd-lock) - A lock implementation for etcd
|
||||||
|
- [etcd-console](https://github.com/matishsiao/etcd-console) - A web-base key/value editor for etcd using PHP
|
||||||
|
|
||||||
**Go libraries**
|
**Go libraries**
|
||||||
|
|
||||||
|
4
build
4
build
@ -12,7 +12,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
|||||||
eval $(go env)
|
eval $(go env)
|
||||||
|
|
||||||
# Static compilation is useful when etcd is run in a container
|
# Static compilation is useful when etcd is run in a container
|
||||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||||
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
|
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
|
||||||
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
|
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||||
"github.com/coreos/etcd/pkg/cors"
|
"github.com/coreos/etcd/pkg/cors"
|
||||||
|
"github.com/coreos/etcd/pkg/osutil"
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/proxy"
|
"github.com/coreos/etcd/proxy"
|
||||||
@ -73,7 +74,10 @@ func Main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
osutil.HandleInterrupts()
|
||||||
|
|
||||||
<-stopped
|
<-stopped
|
||||||
|
osutil.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||||
@ -160,6 +164,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.Start()
|
s.Start()
|
||||||
|
osutil.RegisterInterruptHandler(s.Stop)
|
||||||
|
|
||||||
if cfg.corsInfo.String() != "" {
|
if cfg.corsInfo.String() != "" {
|
||||||
log.Printf("etcd: cors = %s", cfg.corsInfo)
|
log.Printf("etcd: cors = %s", cfg.corsInfo)
|
||||||
|
@ -346,6 +346,20 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
|||||||
c.members[id].RaftAttributes = raftAttr
|
c.members[id].RaftAttributes = raftAttr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate ensures that there is no identical urls in the cluster peer list
|
||||||
|
func (c *Cluster) Validate() error {
|
||||||
|
urlMap := make(map[string]bool)
|
||||||
|
for _, m := range c.Members() {
|
||||||
|
for _, url := range m.PeerURLs {
|
||||||
|
if urlMap[url] {
|
||||||
|
return fmt.Errorf("duplicate url %v in cluster config", url)
|
||||||
|
}
|
||||||
|
urlMap[url] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
||||||
members := make(map[types.ID]*Member)
|
members := make(map[types.ID]*Member)
|
||||||
removed := make(map[types.ID]bool)
|
removed := make(map[types.ID]bool)
|
||||||
|
@ -62,15 +62,8 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
|||||||
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
|
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
|
||||||
}
|
}
|
||||||
|
|
||||||
// No identical IPs in the cluster peer list
|
if err := c.Cluster.Validate(); err != nil {
|
||||||
urlMap := make(map[string]bool)
|
return err
|
||||||
for _, m := range c.Cluster.Members() {
|
|
||||||
for _, url := range m.PeerURLs {
|
|
||||||
if urlMap[url] {
|
|
||||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
|
||||||
}
|
|
||||||
urlMap[url] = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Advertised peer URLs must match those in the cluster peer list
|
// Advertised peer URLs must match those in the cluster peer list
|
||||||
|
@ -144,7 +144,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
var n raft.Node
|
var n raft.Node
|
||||||
var s *raft.MemoryStorage
|
var s *raft.MemoryStorage
|
||||||
var id types.ID
|
var id types.ID
|
||||||
var ss *snap.Snapshotter
|
|
||||||
|
|
||||||
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -154,6 +153,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
|
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
|
||||||
}
|
}
|
||||||
haveWAL := walVersion != wal.WALNotExist
|
haveWAL := walVersion != wal.WALNotExist
|
||||||
|
ss := snap.New(cfg.SnapDir())
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case !haveWAL && !cfg.NewCluster:
|
||||||
@ -185,6 +185,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
|
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if cfg.Cluster.Validate() != nil {
|
||||||
|
return nil, fmt.Errorf("bad discovery cluster: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cfg.Cluster.SetStore(st)
|
cfg.Cluster.SetStore(st)
|
||||||
cfg.PrintWithInitial()
|
cfg.PrintWithInitial()
|
||||||
@ -206,7 +209,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
if cfg.ShouldDiscover() {
|
if cfg.ShouldDiscover() {
|
||||||
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
||||||
}
|
}
|
||||||
ss := snap.New(cfg.SnapDir())
|
|
||||||
snapshot, err := ss.Load()
|
snapshot, err := ss.Load()
|
||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -393,6 +395,16 @@ func (s *EtcdServer) run() {
|
|||||||
log.Panicf("recovery store error: %v", err)
|
log.Panicf("recovery store error: %v", err)
|
||||||
}
|
}
|
||||||
s.Cluster.Recover()
|
s.Cluster.Recover()
|
||||||
|
|
||||||
|
// recover raft transport
|
||||||
|
s.r.transport.RemoveAllPeers()
|
||||||
|
for _, m := range s.Cluster.Members() {
|
||||||
|
if m.ID == s.ID() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||||
|
}
|
||||||
|
|
||||||
appliedi = rd.Snapshot.Metadata.Index
|
appliedi = rd.Snapshot.Metadata.Index
|
||||||
confState = rd.Snapshot.Metadata.ConfState
|
confState = rd.Snapshot.Metadata.ConfState
|
||||||
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
|
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
|
||||||
|
@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil }
|
|||||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||||
|
func (s *nopTransporter) RemoveAllPeers() {}
|
||||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) Stop() {}
|
func (s *nopTransporter) Stop() {}
|
||||||
func (s *nopTransporter) Pause() {}
|
func (s *nopTransporter) Pause() {}
|
||||||
|
@ -547,6 +547,24 @@ func (m *member) Launch() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *member) WaitOK(t *testing.T) {
|
||||||
|
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||||
|
kapi := client.NewKeysAPI(cc)
|
||||||
|
for {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
|
_, err := kapi.Get(ctx, "/")
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(tickDuration)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for m.s.Leader() == 0 {
|
||||||
|
time.Sleep(tickDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *member) URL() string { return m.ClientURLs[0].String() }
|
func (m *member) URL() string { return m.ClientURLs[0].String() }
|
||||||
|
|
||||||
func (m *member) Pause() {
|
func (m *member) Pause() {
|
||||||
|
@ -15,9 +15,14 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
|
"github.com/coreos/etcd/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPauseMember(t *testing.T) {
|
func TestPauseMember(t *testing.T) {
|
||||||
@ -74,3 +79,44 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
|||||||
t.Errorf("unexpect successful launch")
|
t.Errorf("unexpect successful launch")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSnapshotAndRestartMember(t *testing.T) {
|
||||||
|
defer afterTest(t)
|
||||||
|
m := mustNewMember(t, "snapAndRestartTest")
|
||||||
|
m.SnapCount = 100
|
||||||
|
m.Launch()
|
||||||
|
defer m.Terminate(t)
|
||||||
|
m.WaitOK(t)
|
||||||
|
|
||||||
|
resps := make([]*client.Response, 120)
|
||||||
|
var err error
|
||||||
|
for i := 0; i < 120; i++ {
|
||||||
|
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||||
|
kapi := client.NewKeysAPI(cc)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
|
key := fmt.Sprintf("foo%d", i)
|
||||||
|
resps[i], err = kapi.Create(ctx, "/"+key, "bar", -1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("#%d: create on %s error: %v", i, m.URL(), err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
m.Stop(t)
|
||||||
|
m.Restart(t)
|
||||||
|
|
||||||
|
for i := 0; i < 120; i++ {
|
||||||
|
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||||
|
kapi := client.NewKeysAPI(cc)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
|
key := fmt.Sprintf("foo%d", i)
|
||||||
|
resp, err := kapi.Get(ctx, "/"+key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(resp.Node, resps[i].Node) {
|
||||||
|
t.Errorf("#%d: node = %v, want %v", i, resp.Node, resps[i].Node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -63,6 +63,24 @@ type node struct {
|
|||||||
Children map[string]*node // for directory
|
Children map[string]*node // for directory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deepCopyNode(n *node, parent *node) *node {
|
||||||
|
out := &node{
|
||||||
|
Path: n.Path,
|
||||||
|
CreatedIndex: n.CreatedIndex,
|
||||||
|
ModifiedIndex: n.ModifiedIndex,
|
||||||
|
Parent: parent,
|
||||||
|
ExpireTime: n.ExpireTime,
|
||||||
|
ACL: n.ACL,
|
||||||
|
Value: n.Value,
|
||||||
|
Children: make(map[string]*node),
|
||||||
|
}
|
||||||
|
for k, v := range n.Children {
|
||||||
|
out.Children[k] = deepCopyNode(v, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func replacePathNames(n *node, s1, s2 string) {
|
func replacePathNames(n *node, s1, s2 string) {
|
||||||
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
|
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
|
||||||
for _, c := range n.Children {
|
for _, c := range n.Children {
|
||||||
@ -87,9 +105,23 @@ func pullNodesFromEtcd(n *node) map[string]uint64 {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func fixEtcd(n *node) {
|
func fixEtcd(etcdref *node) *node {
|
||||||
n.Path = "/0"
|
n := &node{
|
||||||
machines := n.Children["machines"]
|
Path: "/0",
|
||||||
|
CreatedIndex: etcdref.CreatedIndex,
|
||||||
|
ModifiedIndex: etcdref.ModifiedIndex,
|
||||||
|
ExpireTime: etcdref.ExpireTime,
|
||||||
|
ACL: etcdref.ACL,
|
||||||
|
Children: make(map[string]*node),
|
||||||
|
}
|
||||||
|
|
||||||
|
var machines *node
|
||||||
|
if machineOrig, ok := etcdref.Children["machines"]; ok {
|
||||||
|
machines = deepCopyNode(machineOrig, n)
|
||||||
|
}
|
||||||
|
if machines == nil {
|
||||||
|
return n
|
||||||
|
}
|
||||||
n.Children["members"] = &node{
|
n.Children["members"] = &node{
|
||||||
Path: "/0/members",
|
Path: "/0/members",
|
||||||
CreatedIndex: machines.CreatedIndex,
|
CreatedIndex: machines.CreatedIndex,
|
||||||
@ -97,6 +129,7 @@ func fixEtcd(n *node) {
|
|||||||
ExpireTime: machines.ExpireTime,
|
ExpireTime: machines.ExpireTime,
|
||||||
ACL: machines.ACL,
|
ACL: machines.ACL,
|
||||||
Children: make(map[string]*node),
|
Children: make(map[string]*node),
|
||||||
|
Parent: n,
|
||||||
}
|
}
|
||||||
for name, c := range machines.Children {
|
for name, c := range machines.Children {
|
||||||
q, err := url.ParseQuery(c.Value)
|
q, err := url.ParseQuery(c.Value)
|
||||||
@ -121,29 +154,32 @@ func fixEtcd(n *node) {
|
|||||||
ModifiedIndex: c.ModifiedIndex,
|
ModifiedIndex: c.ModifiedIndex,
|
||||||
ExpireTime: c.ExpireTime,
|
ExpireTime: c.ExpireTime,
|
||||||
ACL: c.ACL,
|
ACL: c.ACL,
|
||||||
Children: map[string]*node{
|
Children: make(map[string]*node),
|
||||||
"attributes": &node{
|
Parent: n.Children["members"],
|
||||||
|
}
|
||||||
|
attrs := &node{
|
||||||
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
||||||
CreatedIndex: c.CreatedIndex,
|
CreatedIndex: c.CreatedIndex,
|
||||||
ModifiedIndex: c.ModifiedIndex,
|
ModifiedIndex: c.ModifiedIndex,
|
||||||
ExpireTime: c.ExpireTime,
|
ExpireTime: c.ExpireTime,
|
||||||
ACL: c.ACL,
|
ACL: c.ACL,
|
||||||
Value: string(attrBytes),
|
Value: string(attrBytes),
|
||||||
},
|
Parent: newNode,
|
||||||
"raftAttributes": &node{
|
}
|
||||||
|
newNode.Children["attributes"] = attrs
|
||||||
|
raftAttrs := &node{
|
||||||
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
||||||
CreatedIndex: c.CreatedIndex,
|
CreatedIndex: c.CreatedIndex,
|
||||||
ModifiedIndex: c.ModifiedIndex,
|
ModifiedIndex: c.ModifiedIndex,
|
||||||
ExpireTime: c.ExpireTime,
|
ExpireTime: c.ExpireTime,
|
||||||
ACL: c.ACL,
|
ACL: c.ACL,
|
||||||
Value: string(raftBytes),
|
Value: string(raftBytes),
|
||||||
},
|
Parent: newNode,
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
newNode.Children["raftAttributes"] = raftAttrs
|
||||||
n.Children["members"].Children[m.ID.String()] = newNode
|
n.Children["members"].Children[m.ID.String()] = newNode
|
||||||
}
|
}
|
||||||
delete(n.Children, "machines")
|
return n
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func mangleRoot(n *node) *node {
|
func mangleRoot(n *node) *node {
|
||||||
@ -157,10 +193,10 @@ func mangleRoot(n *node) *node {
|
|||||||
}
|
}
|
||||||
newRoot.Children["1"] = n
|
newRoot.Children["1"] = n
|
||||||
etcd := n.Children["_etcd"]
|
etcd := n.Children["_etcd"]
|
||||||
delete(n.Children, "_etcd")
|
|
||||||
replacePathNames(n, "/", "/1/")
|
replacePathNames(n, "/", "/1/")
|
||||||
fixEtcd(etcd)
|
newZero := fixEtcd(etcd)
|
||||||
newRoot.Children["0"] = etcd
|
newZero.Parent = newRoot
|
||||||
|
newRoot.Children["0"] = newZero
|
||||||
return newRoot
|
return newRoot
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,8 +15,12 @@
|
|||||||
package osutil
|
package osutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Unsetenv(key string) error {
|
func Unsetenv(key string) error {
|
||||||
@ -33,3 +37,53 @@ func Unsetenv(key string) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InterruptHandler is a function that is called on receiving a
|
||||||
|
// SIGTERM or SIGINT signal.
|
||||||
|
type InterruptHandler func()
|
||||||
|
|
||||||
|
var (
|
||||||
|
interruptRegisterMu, interruptExitMu sync.Mutex
|
||||||
|
// interruptHandlers holds all registered InterruptHandlers in order
|
||||||
|
// they will be executed.
|
||||||
|
interruptHandlers = []InterruptHandler{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegisterInterruptHandler registers a new InterruptHandler. Handlers registered
|
||||||
|
// after interrupt handing was initiated will not be executed.
|
||||||
|
func RegisterInterruptHandler(h InterruptHandler) {
|
||||||
|
interruptRegisterMu.Lock()
|
||||||
|
defer interruptRegisterMu.Unlock()
|
||||||
|
interruptHandlers = append(interruptHandlers, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleInterrupts calls the handler functions on receiving a SIGINT or SIGTERM.
|
||||||
|
func HandleInterrupts() {
|
||||||
|
notifier := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
sig := <-notifier
|
||||||
|
|
||||||
|
interruptRegisterMu.Lock()
|
||||||
|
ihs := make([]InterruptHandler, len(interruptHandlers))
|
||||||
|
copy(ihs, interruptHandlers)
|
||||||
|
interruptRegisterMu.Unlock()
|
||||||
|
|
||||||
|
interruptExitMu.Lock()
|
||||||
|
|
||||||
|
log.Printf("received %v signal, shutting down", sig)
|
||||||
|
|
||||||
|
for _, h := range ihs {
|
||||||
|
h()
|
||||||
|
}
|
||||||
|
signal.Stop(notifier)
|
||||||
|
syscall.Kill(syscall.Getpid(), sig.(syscall.Signal))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exit relays to os.Exit if no interrupt handlers are running, blocks otherwise.
|
||||||
|
func Exit(code int) {
|
||||||
|
interruptExitMu.Lock()
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
@ -16,8 +16,11 @@ package osutil
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUnsetenv(t *testing.T) {
|
func TestUnsetenv(t *testing.T) {
|
||||||
@ -43,3 +46,43 @@ func TestUnsetenv(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
|
||||||
|
select {
|
||||||
|
case s := <-c:
|
||||||
|
if s != sig {
|
||||||
|
t.Fatalf("signal was %v, want %v", s, sig)
|
||||||
|
}
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("timeout waiting for %v", sig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleInterrupts(t *testing.T) {
|
||||||
|
for _, sig := range []syscall.Signal{syscall.SIGINT, syscall.SIGTERM} {
|
||||||
|
n := 1
|
||||||
|
RegisterInterruptHandler(func() { n++ })
|
||||||
|
RegisterInterruptHandler(func() { n *= 2 })
|
||||||
|
|
||||||
|
c := make(chan os.Signal, 2)
|
||||||
|
signal.Notify(c, sig)
|
||||||
|
|
||||||
|
HandleInterrupts()
|
||||||
|
syscall.Kill(syscall.Getpid(), sig)
|
||||||
|
|
||||||
|
// we should receive the signal once from our own kill and
|
||||||
|
// a second time from HandleInterrupts
|
||||||
|
waitSig(t, c, sig)
|
||||||
|
waitSig(t, c, sig)
|
||||||
|
|
||||||
|
if n == 3 {
|
||||||
|
t.Fatalf("interrupt handlers were called in wrong order")
|
||||||
|
}
|
||||||
|
if n != 4 {
|
||||||
|
t.Fatalf("interrupt handlers were not called properly")
|
||||||
|
}
|
||||||
|
// reset interrupt handlers
|
||||||
|
interruptHandlers = interruptHandlers[:0]
|
||||||
|
interruptExitMu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -37,6 +37,7 @@ type Transporter interface {
|
|||||||
Send(m []raftpb.Message)
|
Send(m []raftpb.Message)
|
||||||
AddPeer(id types.ID, urls []string)
|
AddPeer(id types.ID, urls []string)
|
||||||
RemovePeer(id types.ID)
|
RemovePeer(id types.ID)
|
||||||
|
RemoveAllPeers()
|
||||||
UpdatePeer(id types.ID, urls []string)
|
UpdatePeer(id types.ID, urls []string)
|
||||||
Stop()
|
Stop()
|
||||||
}
|
}
|
||||||
@ -132,7 +133,24 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
|
|||||||
func (t *transport) RemovePeer(id types.ID) {
|
func (t *transport) RemovePeer(id types.ID) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
t.peers[id].Stop()
|
t.removePeer(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *transport) RemoveAllPeers() {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
for id, _ := range t.peers {
|
||||||
|
t.removePeer(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// the caller of this function must have the peers mutex.
|
||||||
|
func (t *transport) removePeer(id types.ID) {
|
||||||
|
if peer, ok := t.peers[id]; ok {
|
||||||
|
peer.Stop()
|
||||||
|
} else {
|
||||||
|
log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id)
|
||||||
|
}
|
||||||
delete(t.peers, id)
|
delete(t.peers, id)
|
||||||
delete(t.leaderStats.Followers, id.String())
|
delete(t.leaderStats.Followers, id.String())
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,6 @@
|
|||||||
package version
|
package version
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Version = "2.0.2"
|
Version = "2.0.3"
|
||||||
InternalVersion = "2"
|
InternalVersion = "2"
|
||||||
)
|
)
|
||||||
|
Reference in New Issue
Block a user