Compare commits

...

29 Commits

Author SHA1 Message Date
5686c33e4b *: bump to v2.0.12 2015-06-16 14:19:37 -07:00
6fd2dfdebc etcdmain: fix that advertise-client-urls is required in proxy mode
etcd proxy doesn't need to set advertise-client-urls because the flag is
not used.
2015-06-16 14:18:01 -07:00
896ce1668c build: default git sha to GitNotFound in case git fails 2015-06-16 14:15:48 -07:00
0520b4cd24 proxy: Reuse a bytes buffer as proxy request body.
The call to transport.RoundTrip closes the request body regardless of
the value of request.Closed. This causes subsequent calls to RoundTrip
using the same request body to fail.

Fixes #2895
2015-06-16 14:13:15 -07:00
6ee6f72c48 etcdmain: increase maxIdleConnsPerHost in proxy transport
This PR set maxIdleConnsPerHost to 128 to let proxy handle 128 concurrent
requests in long term smoothly.
If the number of concurrent requests is bigger than this value,
proxy needs to create one new connection when handling each request in
the delta, which is bad because the creation consumes resource and may
eat up your ephemeral port.
2015-06-16 14:10:58 -07:00
b4dd519a63 raft: fix raft node start bug
raft node should set initial prev hard state to empty.
Or it will not send the first hard coded state to application
until the state changes again.

This commit fixs the issue. It introduce a small overhead, that
the same tate might send to application twice when restarting.
But this is fine.
2015-06-16 14:10:41 -07:00
a98fff84e7 etcdctl/cluster_health: improve output if failed to get leader stats
When failing to get leader stats, it said 'cluster is unhealthy' before.
This is confusing when it cannot get stats because advertised client urls
are set wrong and the cluster is healthy.
2015-06-16 14:07:45 -07:00
973cfbebda *: make dial timeout configurable
Dial timeout is set shorter because
1. etcd is supposed to work in good environment, and the new value is long
enough
2. shorter dial timeout makes dial fail faster, which is good for
performance

Conflicts:
	etcdmain/etcd.go
2015-06-16 14:06:18 -07:00
00d1d34cf8 Merge pull request #2832 from yichengq/stream-2.1
not print unhelpful info when connecting to etcd 2.1
2015-05-27 13:36:38 -07:00
fcf81fd6bf *: bump to v2.0.11+git 2015-05-15 13:55:13 -07:00
0678329cd6 *: bump to v2.0.11 2015-05-15 13:54:32 -07:00
9a0e0c2eae etcdmain: better error msg when detected duplicate id in discovery
Conflicts:
	etcdmain/etcd.go
2015-05-15 13:47:02 -07:00
3e4d57c37d pkg/fileutil: add plan9 lockfile support 2015-05-15 13:35:51 -07:00
d30e764b2d version: added more version information
added more version information output to aid debugging
print etcd Version, Git SHA, Go runtime version, OS
and architecture

Fixes #2560

Conflicts:
	version/version.go
2015-05-15 12:34:33 -07:00
b5b7c78f1b docs: proxy needs accessible advertise client urls
Users cannot use proxy if -advertise-client-urls is set correctly.
Especially mention this in the doc to help them bypass the wrong
settings.
2015-05-15 12:32:58 -07:00
ee1c07c3d4 proxy: Fix connection leak when client disconnect
established connections were leaked when client disconnected before
proxyreq completes. This happens all time for wait=true requests.
2015-05-15 12:32:49 -07:00
67c5d4dfd2 etcdmain: advertise-client-urls must be set if listen-client-urls is set
Before this PR, people can set listen-client-urls without setting
advertise-client-urls, and leaves advertise-client-urls as default
localhost value. The client libraries which sync the cluster info
fetch wrong advertise-client-urls and cannot connect to the cluster.
This PR avoids this case and provides better UX.

On the other hand, this change is safe because people always want to set
advertise-client-urls if listen-client-urls is set. The default localhost
advertise url cannot be accessed from the outside, and should always be
set except that etcd is bootstrapped with no flag.

Conflicts:
	etcdmain/etcd.go
2015-05-15 12:32:35 -07:00
3afcbd6f83 docs: clarify the disaster recovery guide
A bit was missing from the documentation on disaster recovery, the reset
of the advertised peer urls for the node recovered from backup. Without
that, any subsequent server joining the cluster would not be able to
speak to the first node.
2015-05-15 12:30:48 -07:00
8fed61b2eb client: 410 is a vaild response for member.Remove
When removing a member, etcdserver might return 410 that indicates
the member has been removed. To client, 410 is a vaild response since
the client might do internal retry.
2015-05-15 12:30:37 -07:00
c8d386e18c pkg/fileutil: add filelock support for solaris 2015-05-15 11:30:40 -07:00
2b6a44b7b0 raft: fix typo in raftlog
fix typo in String() method of raftlog which will misorder
the "committed" and "unstable.offset" output.
2015-05-15 11:30:32 -07:00
8069d08b96 etcdserver: init server stats before passing it as argument
It is more reasonable to init the variable before passing it as an
argument.

It fixes a bug that etcdserver may panic on server stats when processing
a message from rafthttp streamReader before server stats is initialized
in server.Start().
2015-05-15 11:30:23 -07:00
5074235254 rafthttp: stop printing log when attaching stream with the same term
There is no need to print log when attaching stream with the same
term because the stream is installed back immediately.

This happens a lot when etcd 2.1 connects to etcd 2.0, so we make
the change.
2015-05-14 21:52:59 -07:00
f59bddd74b rafthttp: not log endpoint unsupport error
The error happens a lot when running 2.0 together with 2.1, and is
totally unhelpful.
2015-05-07 14:21:15 -07:00
58f035844c Merge pull request #2753 from yichengq/fix-remove-panic
backport #2701 to release-2.0 branch
2015-04-28 21:17:55 -07:00
f83774b4cd integration: add tests around the membership change issues 2015-04-24 13:49:17 -07:00
12c32137a8 rafthttp: add AddRemote
Add remotes to rafthttp, who help newly joined members catch up the
progress of the cluster. It supports basic message sending to remote, and
has no stream connection for simplicity. remotes will not be used
after the latest peers have been added into rafthttp.

Conflicts:
	rafthttp/pipeline.go
	rafthttp/transport.go
2015-04-24 13:37:16 -07:00
fce4cf4dc8 Revert "etcdserver: fix cluster fallback recovery"
This reverts commit cff005777a.
2015-04-24 13:06:43 -07:00
06a72b2702 *: bump to v2.0.10+git 2015-04-22 15:21:59 -07:00
32 changed files with 467 additions and 95 deletions

View File

@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
### Member Migration
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
#### Copy the data directory of the now-idle member to the new machine
```
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
```
```
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
#### Restoring the cluster
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
### Client Request Timeout

View File

@ -6,6 +6,8 @@ etcd currently supports two proxy modes: `readwrite` and `readonly`. The default
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
### Using an etcd proxy
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).

6
build
View File

@ -11,6 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
eval $(go env)
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
# Static compilation is useful when etcd is run in a container
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl

View File

@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
return err
}
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
}
type membersAPIActionList struct{}

View File

@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
}
// do we have a leader?
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
cl := client.GetCluster()
ep, ls0, err := getLeaderStats(tr, cl)
if err != nil {
fmt.Println("cluster is unhealthy")
fmt.Println("cluster may be unhealthy: failed to connect", cl)
os.Exit(1)
}

View File

@ -20,6 +20,7 @@ import (
"log"
"net/url"
"os"
"runtime"
"strings"
"github.com/coreos/etcd/etcdserver"
@ -62,6 +63,7 @@ var (
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
)
type config struct {
@ -211,7 +213,10 @@ func (cfg *config) Parse(arguments []string) error {
}
if cfg.printVersion {
fmt.Println("etcd version", version.Version)
fmt.Printf("etcd Version: %s\n", version.Version)
fmt.Printf("Git SHA: %s\n", version.GitSHA)
fmt.Printf("Go Version: %s\n", runtime.Version())
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
os.Exit(0)
}
@ -254,6 +259,13 @@ func (cfg *config) Parse(arguments []string) error {
return err
}
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
if cfg.proxy.String() != proxyFlagOn {
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
}
}
if 5*cfg.TickMs > cfg.ElectionMs {
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
}

View File

@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-snapshot-count=10",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
wcfg := &config{
dir: "testdir",

View File

@ -56,8 +56,12 @@ func Main() {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
if err != nil {
log.Printf("etcd: error verifying flags, %v. See 'etcd -help'.", err)
os.Exit(2)
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
switch err {
case errUnsetAdvertiseClientURLsFlag:
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
os.Exit(1)
}
var stopped <-chan struct{}
@ -90,8 +94,10 @@ func Main() {
if err != nil {
switch err {
case discovery.ErrDuplicateID:
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
cfg.name, cfg.durl, cfg.dir)
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
log.Printf("or use a new discovery token if the previous bootstrap failed.")
default:
log.Fatalf("etcd: %v", err)
}
@ -110,7 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
}
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
return nil, err
}
@ -191,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {
@ -227,6 +233,7 @@ func startProxy(cfg *config) error {
}
pt, err := transport.NewTransport(cfg.clientTLSInfo)
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
if err != nil {
return err
}

View File

@ -56,7 +56,8 @@ clustering flags:
--initial-cluster-token 'etcd-cluster'
initial cluster token for the etcd cluster during bootstrap.
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
list of this member's client URLs to advertise to the rest of the cluster.
list of this member's client URLs to advertise to the public.
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
--discovery ''
discovery URL used to bootstrap the cluster.
--discovery-fallback 'proxy'

View File

@ -59,12 +59,6 @@ type Cluster struct {
id types.ID
token string
store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map
members map[types.ID]*Member
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
}

View File

@ -21,7 +21,6 @@ import (
"log"
"net/http"
"sort"
"strconv"
"time"
"github.com/coreos/etcd/pkg/types"
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
return NewClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp"
@ -29,10 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
timer: timer,
}
mux := http.NewServeMux()
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
ph := NewPeerHandler(&fakeCluster{}, h)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
h := &peerMembersHandler{clusterInfo: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -158,6 +158,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
var remotes []*Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
@ -170,7 +171,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
cfg.Cluster.UpdateIndex(existingCluster.index)
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
@ -238,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
Name: cfg.Name,
ID: id.String(),
}
sstats.Initialize()
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
@ -260,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
@ -292,7 +300,6 @@ func (s *EtcdServer) start() {
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -395,19 +402,15 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index

View File

@ -1389,6 +1389,7 @@ type nopTransporter struct{}
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}

View File

@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
clusterMustProgress(t, c.Members[:1])
}
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
for _, m := range c.Members {
m.SnapCount = 10
}
c.Launch(t)
defer c.Terminate(t)
// force a snapshot
for i := 0; i < 20; i++ {
clusterMustProgress(t, c.Members)
}
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.
@ -526,7 +566,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
@ -623,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
}
func mustNewTransport(t *testing.T) *http.Transport {
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}

View File

@ -0,0 +1,90 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"errors"
"os"
"syscall"
"time"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fname string
file *os.File
}
func (l *lock) Name() string {
return l.fname
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
f, err := os.Open(l.fname)
if err != nil {
return ErrLocked
}
l.file = f
return nil
}
// Lock acquires exclusivity on the lock with blocking
func (l *lock) Lock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
for {
f, err := os.Open(l.fname)
if err == nil {
l.file = f
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
return l.file.Close()
}
func (l *lock) Destroy() error {
return nil
}
func NewLock(file string) (Lock, error) {
l := &lock{fname: file}
return l, nil
}

View File

@ -0,0 +1,98 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build solaris
package fileutil
import (
"errors"
"os"
"syscall"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fd int
file *os.File
}
func (l *lock) Name() string {
return l.file.Name()
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Pid = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
// Lock acquires exclusivity on the lock without blocking
func (l *lock) Lock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_UNLCK
lock.Whence = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
func (l *lock) Destroy() error {
return l.file.Close()
}
func NewLock(file string) (Lock, error) {
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
l := &lock{int(f.Fd()), f}
return l, nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9
// +build !windows,!plan9,!solaris
package fileutil

View File

@ -23,7 +23,7 @@ import (
// NewTimeoutTransport returns a transport created using the given TLS info.
// If read/write on the created connection blocks longer than its time limit,
// it will return timeout error.
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
tr, err := NewTransport(info)
if err != nil {
return nil, err
@ -33,7 +33,7 @@ func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*ht
tr.MaxIdleConnsPerHost = -1
tr.Dial = (&rwTimeoutDialer{
Dialer: net.Dialer{
Timeout: 30 * time.Second,
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
},
rdtimeoutd: rdtimeoutd,

View File

@ -26,7 +26,7 @@ import (
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
// that can dial out timeout connections.
func TestNewTimeoutTransport(t *testing.T) {
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
if err != nil {
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
}

View File

@ -18,6 +18,17 @@ import (
"net/http"
)
const (
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
// maintained between proxy and each member. We set it to 128 to
// let proxy handle 128 concurrent requests in long term smoothly.
// If the number of concurrent requests is bigger than this value,
// proxy needs to create one new connection when handling each request in
// the delta, which is bad because the creation consumes resource and
// may eat up ephemeral ports.
DefaultMaxIdleConnsPerHost = 128
)
// GetProxyURLs is a function which should return the current set of URLs to
// which client requests should be proxied. This function will be queried
// periodically by the proxy Handler to refresh the set of available

View File

@ -15,8 +15,10 @@
package proxy
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
proxyreq := new(http.Request)
*proxyreq = *clientreq
var (
proxybody []byte
err error
)
if clientreq.Body != nil {
proxybody, err = ioutil.ReadAll(clientreq.Body)
if err != nil {
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
e.WriteTo(rw)
return
}
}
// deep-copy the headers, as these will be modified below
proxyreq.Header = make(http.Header)
copyHeader(proxyreq.Header, clientreq.Header)
@ -73,10 +90,31 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
return
}
completeCh := make(chan bool, 1)
closeNotifier, ok := rw.(http.CloseNotifier)
if ok {
go func() {
select {
case <-closeNotifier.CloseNotify():
tp, ok := p.transport.(*http.Transport)
if ok {
tp.CancelRequest(proxyreq)
}
case <-completeCh:
}
}()
defer func() {
completeCh <- true
}()
}
var res *http.Response
var err error
for _, ep := range endpoints {
if proxybody != nil {
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
}
redirectRequest(proxyreq, ep.URL)
res, err = p.transport.RoundTrip(proxyreq)

View File

@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
}
func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
}
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,

View File

@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
lead := None
prevSoftSt := r.softState()
prevHardSt := r.HardState
prevHardSt := emptyState
for {
if advancec != nil {

View File

@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 1}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
}
@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 3}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries,
}

View File

@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
from, err := types.IDFromString(fromStr)
if err != nil {
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
http.Error(w, "invalid path", http.StatusNotFound)
return
}

View File

@ -40,6 +40,7 @@ const (
appRespBatchMs = 50
propBatchMs = 10
DialTimeout = time.Second
ConnReadTimeout = 5 * time.Second
ConnWriteTimeout = 5 * time.Second
)
@ -199,7 +200,7 @@ func (p *peer) handle() {
log.Printf("sender: the connection with %s became inactive", p.id)
p.active = false
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
} else {
@ -208,7 +209,7 @@ func (p *peer) handle() {
p.active = true
p.errored = nil
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
}

42
rafthttp/remote.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/http"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
peer *peer
}
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
return &remote{
id: to,
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
}
}
func (g *remote) Send(m raftpb.Message) {
g.peer.send(m)
}
func (g *remote) Stop() {
g.peer.Stop()
}

View File

@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
// ignore lower-term streaming request
if sw.term < s.w.term {
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
} else if sw.term == s.w.term {
s.w.stopWithoutLog()
} else {
s.w.stop()
}
s.w.stop()
}
s.w = sw
return nil
@ -151,21 +154,23 @@ type WriteFlusher interface {
// TODO: replace fs with stream stats
type streamWriter struct {
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
printLog bool
}
// newStreamWriter starts and returns a new unstarted stream writer.
// The caller should call stop when finished, to shut it down.
func newStreamWriter(to types.ID, term uint64) *streamWriter {
s := &streamWriter{
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
printLog: true,
}
return s
}
@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
func (s *streamWriter) handle(w WriteFlusher) {
defer func() {
close(s.done)
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
if s.printLog {
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
}
}()
ew := newEntryWriter(w, s.to)
@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
<-s.done
}
func (s *streamWriter) stopWithoutLog() {
s.printLog = false
s.stop()
}
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
// TODO: move the raft interface out of the reader.

View File

@ -35,6 +35,12 @@ type Raft interface {
type Transporter interface {
Handler() http.Handler
Send(m []raftpb.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
AddRemote(id types.ID, urls []string)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
@ -50,9 +56,10 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
errorc chan error
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]*peer // peers map
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
raft: r,
serverStats: ss,
leaderStats: ls,
remotes: make(map[types.ID]*remote),
peers: make(map[types.ID]*peer),
errorc: errorc,
}
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
to := types.ID(m.To)
p, ok := t.peers[to]
if !ok {
log.Printf("etcdserver: send message to unknown receiver %s", to)
if ok {
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
}
p.Send(m)
continue
}
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
g, ok := t.remotes[to]
if ok {
g.Send(m)
continue
}
p.Send(m)
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
}
func (t *transport) Stop() {
for _, r := range t.remotes {
r.Stop()
}
for _, p := range t.peers {
p.Stop()
}
@ -113,6 +130,21 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.remotes[id]; ok {
return
}
peerURL := us[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, RaftPrefix)
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
}
func (t *transport) AddPeer(id types.ID, urls []string) {
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -23,7 +23,10 @@ import (
)
var (
Version = "2.0.10"
Version = "2.0.12"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
)
// WalVersion is an enum for versions of etcd logs.