Compare commits

..

51 Commits

Author SHA1 Message Date
92e3895214 *: bump to v2.0.13 2015-06-25 14:10:15 -07:00
b12a52b0fd etcdmain: fix the check in fallback-to-proxy case
advertise-client-urls has to be set if listen-client-urls is set when
fallbacking to proxy, which breaks the behavior. Loosen the check to fix
it.
2015-06-25 14:07:45 -07:00
9fa4002787 *: bump to v2.0.12+git 2015-06-16 14:20:25 -07:00
5686c33e4b *: bump to v2.0.12 2015-06-16 14:19:37 -07:00
6fd2dfdebc etcdmain: fix that advertise-client-urls is required in proxy mode
etcd proxy doesn't need to set advertise-client-urls because the flag is
not used.
2015-06-16 14:18:01 -07:00
896ce1668c build: default git sha to GitNotFound in case git fails 2015-06-16 14:15:48 -07:00
0520b4cd24 proxy: Reuse a bytes buffer as proxy request body.
The call to transport.RoundTrip closes the request body regardless of
the value of request.Closed. This causes subsequent calls to RoundTrip
using the same request body to fail.

Fixes #2895
2015-06-16 14:13:15 -07:00
6ee6f72c48 etcdmain: increase maxIdleConnsPerHost in proxy transport
This PR set maxIdleConnsPerHost to 128 to let proxy handle 128 concurrent
requests in long term smoothly.
If the number of concurrent requests is bigger than this value,
proxy needs to create one new connection when handling each request in
the delta, which is bad because the creation consumes resource and may
eat up your ephemeral port.
2015-06-16 14:10:58 -07:00
b4dd519a63 raft: fix raft node start bug
raft node should set initial prev hard state to empty.
Or it will not send the first hard coded state to application
until the state changes again.

This commit fixs the issue. It introduce a small overhead, that
the same tate might send to application twice when restarting.
But this is fine.
2015-06-16 14:10:41 -07:00
a98fff84e7 etcdctl/cluster_health: improve output if failed to get leader stats
When failing to get leader stats, it said 'cluster is unhealthy' before.
This is confusing when it cannot get stats because advertised client urls
are set wrong and the cluster is healthy.
2015-06-16 14:07:45 -07:00
973cfbebda *: make dial timeout configurable
Dial timeout is set shorter because
1. etcd is supposed to work in good environment, and the new value is long
enough
2. shorter dial timeout makes dial fail faster, which is good for
performance

Conflicts:
	etcdmain/etcd.go
2015-06-16 14:06:18 -07:00
00d1d34cf8 Merge pull request #2832 from yichengq/stream-2.1
not print unhelpful info when connecting to etcd 2.1
2015-05-27 13:36:38 -07:00
fcf81fd6bf *: bump to v2.0.11+git 2015-05-15 13:55:13 -07:00
0678329cd6 *: bump to v2.0.11 2015-05-15 13:54:32 -07:00
9a0e0c2eae etcdmain: better error msg when detected duplicate id in discovery
Conflicts:
	etcdmain/etcd.go
2015-05-15 13:47:02 -07:00
3e4d57c37d pkg/fileutil: add plan9 lockfile support 2015-05-15 13:35:51 -07:00
d30e764b2d version: added more version information
added more version information output to aid debugging
print etcd Version, Git SHA, Go runtime version, OS
and architecture

Fixes #2560

Conflicts:
	version/version.go
2015-05-15 12:34:33 -07:00
b5b7c78f1b docs: proxy needs accessible advertise client urls
Users cannot use proxy if -advertise-client-urls is set correctly.
Especially mention this in the doc to help them bypass the wrong
settings.
2015-05-15 12:32:58 -07:00
ee1c07c3d4 proxy: Fix connection leak when client disconnect
established connections were leaked when client disconnected before
proxyreq completes. This happens all time for wait=true requests.
2015-05-15 12:32:49 -07:00
67c5d4dfd2 etcdmain: advertise-client-urls must be set if listen-client-urls is set
Before this PR, people can set listen-client-urls without setting
advertise-client-urls, and leaves advertise-client-urls as default
localhost value. The client libraries which sync the cluster info
fetch wrong advertise-client-urls and cannot connect to the cluster.
This PR avoids this case and provides better UX.

On the other hand, this change is safe because people always want to set
advertise-client-urls if listen-client-urls is set. The default localhost
advertise url cannot be accessed from the outside, and should always be
set except that etcd is bootstrapped with no flag.

Conflicts:
	etcdmain/etcd.go
2015-05-15 12:32:35 -07:00
3afcbd6f83 docs: clarify the disaster recovery guide
A bit was missing from the documentation on disaster recovery, the reset
of the advertised peer urls for the node recovered from backup. Without
that, any subsequent server joining the cluster would not be able to
speak to the first node.
2015-05-15 12:30:48 -07:00
8fed61b2eb client: 410 is a vaild response for member.Remove
When removing a member, etcdserver might return 410 that indicates
the member has been removed. To client, 410 is a vaild response since
the client might do internal retry.
2015-05-15 12:30:37 -07:00
c8d386e18c pkg/fileutil: add filelock support for solaris 2015-05-15 11:30:40 -07:00
2b6a44b7b0 raft: fix typo in raftlog
fix typo in String() method of raftlog which will misorder
the "committed" and "unstable.offset" output.
2015-05-15 11:30:32 -07:00
8069d08b96 etcdserver: init server stats before passing it as argument
It is more reasonable to init the variable before passing it as an
argument.

It fixes a bug that etcdserver may panic on server stats when processing
a message from rafthttp streamReader before server stats is initialized
in server.Start().
2015-05-15 11:30:23 -07:00
5074235254 rafthttp: stop printing log when attaching stream with the same term
There is no need to print log when attaching stream with the same
term because the stream is installed back immediately.

This happens a lot when etcd 2.1 connects to etcd 2.0, so we make
the change.
2015-05-14 21:52:59 -07:00
f59bddd74b rafthttp: not log endpoint unsupport error
The error happens a lot when running 2.0 together with 2.1, and is
totally unhelpful.
2015-05-07 14:21:15 -07:00
58f035844c Merge pull request #2753 from yichengq/fix-remove-panic
backport #2701 to release-2.0 branch
2015-04-28 21:17:55 -07:00
f83774b4cd integration: add tests around the membership change issues 2015-04-24 13:49:17 -07:00
12c32137a8 rafthttp: add AddRemote
Add remotes to rafthttp, who help newly joined members catch up the
progress of the cluster. It supports basic message sending to remote, and
has no stream connection for simplicity. remotes will not be used
after the latest peers have been added into rafthttp.

Conflicts:
	rafthttp/pipeline.go
	rafthttp/transport.go
2015-04-24 13:37:16 -07:00
fce4cf4dc8 Revert "etcdserver: fix cluster fallback recovery"
This reverts commit cff005777a.
2015-04-24 13:06:43 -07:00
06a72b2702 *: bump to v2.0.10+git 2015-04-22 15:21:59 -07:00
fbaef05885 *: bump to v2.0.10 2015-04-22 15:21:38 -07:00
31a94d28e3 etcdctl: add extended as output format
extended wasn't documented in the help as one of the output formats, fix
this!

Conflicts:
	etcdctl/main.go
2015-04-22 15:11:06 -07:00
88660a303f snap: load should only return ErrNoSnapshot
If there is no available snapshot, load should return
ErrNoSnapshot. etcdserver might recover from that error
if it still have complete WAL files.
2015-04-22 15:09:38 -07:00
53c74dbd0b etcdserver: prevExist=true + condition is compareAndSwap
PrevExist indicates the key should exist. Condition compares with
an existing key. So PrevExist+condition = CompareAndSwap not Update.
2015-04-22 15:09:28 -07:00
8a8af60fad etcdctl: backup tool should use the new layout 2015-04-22 15:09:15 -07:00
7de19fefe8 etcdserver: fix minor bug in EtcdServer.send
it seems to nothing serious.
after deleted peers, the log may output:
"etcdserver: send message to unknown receiver %s"
2015-04-22 15:09:04 -07:00
7750f387b0 wal: better log msg 2015-04-22 15:08:50 -07:00
e33ab24442 wal: never leave a corrupted wal file
If the process dies during wal.cut(), it might leave a corrupted wal
file. This commit solves the problem by creating a temp wal file first,
then atomically rename it to a wal file when we are sure it is vaild.

Conflicts:
	wal/wal.go
2015-04-22 15:08:42 -07:00
fce2c1eeaf discovery: drop trailing . from srv target 2015-04-22 15:06:20 -07:00
6a3bb93305 discovery: add a test case for srv
During srv discovery, it should try to match local member with
resolved addr and return unresolved hostnames for the cluster.

Conflicts:
	discovery/srv_test.go
2015-04-22 15:06:03 -07:00
21455d2f3b *: stop using resolved tcp addr
We start to resolve host into tcp addrs since we generate
tcp based initial-cluster during srv discovery. However it
creates problems around tls and cluster verification. The
srv discovery only needs to use resolved the tcp addr to
find the local node. It does not have to resolve everything
and use the resolved addrs.

This fixes #2488 and #2226
2015-04-22 14:59:07 -07:00
51bb4220c5 Clarify that it is the proxy doing the shuffle. 2015-04-22 14:58:54 -07:00
d8c506923f proxy: shuffle endpoints
Shuffle endpoitns to avoid being "stuck" to a single cluster member.
2015-04-22 14:58:40 -07:00
5d778f85ca *: bump to v2.0.9+git 2015-04-07 15:18:50 -07:00
02697ca725 *: bump to v2.0.9 2015-04-07 15:18:29 -07:00
bd693c7069 etcdctl: refactor message in import command 2015-04-07 15:16:13 -07:00
52c90cdcfb etcdctl: import hidden keys 2015-04-07 14:49:40 -07:00
a88b22ac0a store: fix watcher removal 2015-04-07 14:46:10 -07:00
e93f8b8a12 *: bump to v2.0.8+git 2015-03-31 14:29:38 -07:00
44 changed files with 696 additions and 153 deletions

View File

@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
### Member Migration
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
#### Copy the data directory of the now-idle member to the new machine
```
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
```
```
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
#### Restoring the cluster
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
### Client Request Timeout

View File

@ -4,6 +4,10 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
### Using an etcd proxy
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).

6
build
View File

@ -11,6 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
eval $(go env)
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
# Static compilation is useful when etcd is run in a container
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl

View File

@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
return err
}
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
}
type membersAPIActionList struct{}

View File

@ -25,7 +25,8 @@ import (
var (
// indirection for testing
lookupSRV = net.LookupSRV
lookupSRV = net.LookupSRV
resolveTCPAddr = net.ResolveTCPAddr
)
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
@ -38,7 +39,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
// First, resolve the apurls
for _, url := range apurls {
tcpAddr, err := net.ResolveTCPAddr("tcp", url.Host)
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
if err != nil {
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
return "", "", err
@ -52,8 +53,9 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
return err
}
for _, srv := range addrs {
host := net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port))
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
target := strings.TrimSuffix(srv.Target, ".")
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
tcpAddr, err := resolveTCPAddr("tcp", host)
if err != nil {
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
continue
@ -68,8 +70,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
n = fmt.Sprintf("%d", tempName)
tempName += 1
}
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, tcpAddr.String()))
log.Printf("discovery: Got bootstrap from DNS for %s at host %s to %s%s", service, host, prefix, tcpAddr.String())
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
log.Printf("discovery: Got bootstrap from DNS for %s at %s%s", service, prefix, host)
}
return nil
}

View File

@ -23,19 +23,26 @@ import (
)
func TestSRVGetCluster(t *testing.T) {
defer func() { lookupSRV = net.LookupSRV }()
defer func() {
lookupSRV = net.LookupSRV
resolveTCPAddr = net.ResolveTCPAddr
}()
name := "dnsClusterTest"
tests := []struct {
withSSL []*net.SRV
withoutSSL []*net.SRV
urls []string
expected string
dns map[string]string
expected string
}{
{
[]*net.SRV{},
[]*net.SRV{},
nil,
nil,
"",
},
{
@ -46,6 +53,8 @@ func TestSRVGetCluster(t *testing.T) {
},
[]*net.SRV{},
nil,
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
},
{
@ -58,6 +67,7 @@ func TestSRVGetCluster(t *testing.T) {
&net.SRV{Target: "10.0.0.1", Port: 7001},
},
nil,
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
},
{
@ -70,8 +80,22 @@ func TestSRVGetCluster(t *testing.T) {
&net.SRV{Target: "10.0.0.1", Port: 7001},
},
[]string{"https://10.0.0.1:2480"},
nil,
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
},
// matching local member with resolved addr and return unresolved hostnames
{
[]*net.SRV{
&net.SRV{Target: "1.example.com.", Port: 2480},
&net.SRV{Target: "2.example.com.", Port: 2480},
&net.SRV{Target: "3.example.com.", Port: 2480},
},
nil,
[]string{"https://10.0.0.1:2480"},
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
},
}
for i, tt := range tests {
@ -84,6 +108,12 @@ func TestSRVGetCluster(t *testing.T) {
}
return "", nil, errors.New("Unkown service in mock")
}
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
if tt.dns == nil || tt.dns[addr] == "" {
return net.ResolveTCPAddr(network, addr)
}
return net.ResolveTCPAddr(network, tt.dns[addr])
}
urls := testutil.MustNewURLs(t, tt.urls)
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
if err != nil {

View File

@ -44,10 +44,10 @@ func NewBackupCommand() cli.Command {
// handleBackup handles a request that intends to do a backup.
func handleBackup(c *cli.Context) {
srcSnap := path.Join(c.String("data-dir"), "snap")
destSnap := path.Join(c.String("backup-dir"), "snap")
srcWAL := path.Join(c.String("data-dir"), "wal")
destWAL := path.Join(c.String("backup-dir"), "wal")
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
if err := os.MkdirAll(destSnap, 0700); err != nil {
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)

View File

@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
}
// do we have a leader?
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
cl := client.GetCluster()
ep, ls0, err := getLeaderStats(tr, cl)
if err != nil {
fmt.Println("cluster is unhealthy")
fmt.Println("cluster may be unhealthy: failed to connect", cl)
os.Exit(1)
}

View File

@ -26,6 +26,7 @@ func NewImportSnapCommand() cli.Command {
Usage: "import a snapshot to a cluster",
Flags: []cli.Flag{
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
},
Action: handleImportSnap,
@ -36,7 +37,7 @@ func handleImportSnap(c *cli.Context) {
d, err := ioutil.ReadFile(c.String("snap"))
if err != nil {
if c.String("snap") == "" {
fmt.Printf("no snapshot file provided (use --snap)")
fmt.Printf("no snapshot file provided (use --snap)\n")
} else {
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
}
@ -83,6 +84,15 @@ func handleImportSnap(c *cli.Context) {
handleError(ErrorFromEtcd, err)
}
n := copyKeys(all.Node, setc)
hiddens := c.StringSlice("hidden")
for _, h := range hiddens {
allh, err := st.Get(h, true, true)
if err != nil {
handleError(ErrorFromEtcd, err)
}
n += copyKeys(allh.Node, setc)
}
close(setc)
wg.Wait()
fmt.Printf("finished importing %d keys\n", n)

View File

@ -31,7 +31,7 @@ func main() {
app.Flags = []cli.Flag{
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple` or `json`)"},
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple`, `extended` or `json`)"},
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},

View File

@ -15,18 +15,17 @@
package etcdmain
import (
"errors"
"flag"
"fmt"
"log"
"net/url"
"os"
"runtime"
"strings"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/version"
)
@ -64,6 +63,7 @@ var (
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
)
type config struct {
@ -213,7 +213,10 @@ func (cfg *config) Parse(arguments []string) error {
}
if cfg.printVersion {
fmt.Println("etcd version", version.Version)
fmt.Printf("etcd Version: %s\n", version.Version)
fmt.Printf("Git SHA: %s\n", version.GitSHA)
fmt.Printf("Go Version: %s\n", runtime.Version())
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
os.Exit(0)
}
@ -256,8 +259,14 @@ func (cfg *config) Parse(arguments []string) error {
return err
}
if err := cfg.resolveUrls(); err != nil {
return errors.New("cannot resolve DNS hostnames.")
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
// TODO(yichengq): check this for joining through discovery service case
mayFallbackToProxy := flags.IsSet(cfg.FlagSet, "discovery") && cfg.fallback.String() == fallbackFlagProxy
mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
if !mayBeProxy {
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
}
}
if 5*cfg.TickMs > cfg.ElectionMs {
@ -275,10 +284,6 @@ func initialClusterFromName(name string) string {
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
}
func (cfg *config) resolveUrls() error {
return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls)
}
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }

View File

@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-snapshot-count=10",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
wcfg := &config{
dir: "testdir",
@ -210,6 +212,71 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
}
}
func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
tests := []struct {
args []string
werr error
}{
{
[]string{
"-initial-cluster=infra1=http://127.0.0.1:2380",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery-srv=example.com",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery=http://example.com/abc",
"-discovery-fallback=exit",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery=http://example.com/abc",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=on",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=readonly",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
}
for i, tt := range tests {
cfg := NewConfig()
err := cfg.Parse(tt.args)
if err != tt.werr {
t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
}
}
}
func TestConfigIsNewCluster(t *testing.T) {
tests := []struct {
state string

View File

@ -56,8 +56,12 @@ func Main() {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
if err != nil {
log.Printf("etcd: error verifying flags, %v. See 'etcd -help'.", err)
os.Exit(2)
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
switch err {
case errUnsetAdvertiseClientURLsFlag:
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
os.Exit(1)
}
var stopped <-chan struct{}
@ -90,8 +94,10 @@ func Main() {
if err != nil {
switch err {
case discovery.ErrDuplicateID:
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
cfg.name, cfg.durl, cfg.dir)
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
log.Printf("or use a new discovery token if the previous bootstrap failed.")
default:
log.Fatalf("etcd: %v", err)
}
@ -110,7 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
}
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
return nil, err
}
@ -191,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {
@ -227,6 +233,7 @@ func startProxy(cfg *config) error {
}
pt, err := transport.NewTransport(cfg.clientTLSInfo)
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
if err != nil {
return err
}

View File

@ -56,7 +56,8 @@ clustering flags:
--initial-cluster-token 'etcd-cluster'
initial cluster token for the etcd cluster during bootstrap.
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
list of this member's client URLs to advertise to the rest of the cluster.
list of this member's client URLs to advertise to the public.
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
--discovery ''
discovery URL used to bootstrap the cluster.
--discovery-fallback 'proxy'

View File

@ -59,12 +59,6 @@ type Cluster struct {
id types.ID
token string
store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map
members map[types.ID]*Member
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
}

View File

@ -21,7 +21,6 @@ import (
"log"
"net/http"
"sort"
"strconv"
"time"
"github.com/coreos/etcd/pkg/types"
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
return NewClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}

View File

@ -119,7 +119,6 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, err)
return
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp"
@ -29,10 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
timer: timer,
}
mux := http.NewServeMux()
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
ph := NewPeerHandler(&fakeCluster{}, h)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
h := &peerMembersHandler{clusterInfo: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -158,6 +158,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
var remotes []*Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
@ -170,7 +171,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
cfg.Cluster.UpdateIndex(existingCluster.index)
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
@ -238,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
Name: cfg.Name,
ID: id.String(),
}
sstats.Initialize()
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
@ -260,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
@ -292,7 +300,6 @@ func (s *EtcdServer) start() {
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -395,19 +402,15 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index
@ -671,9 +674,9 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
}
func (s *EtcdServer) send(ms []raftpb.Message) {
for _, m := range ms {
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
m.To = 0
for i, _ := range ms {
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
ms[i].To = 0
}
}
s.r.transport.Send(ms)
@ -723,7 +726,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
switch {
case existsSet:
if exists {
return f(s.store.Update(r.Path, r.Val, expr))
if r.PrevIndex == 0 && r.PrevValue == "" {
return f(s.store.Update(r.Path, r.Val, expr))
} else {
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
}
}
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
case r.PrevIndex > 0 || r.PrevValue != "":

View File

@ -235,20 +235,18 @@ func TestApplyRequest(t *testing.T) {
},
},
},
// PUT with PrevExist=true *and* PrevIndex set ==> Update
// TODO(jonboulle): is this expected?!
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
Response{Event: &store.Event{}},
[]testutil.Action{
{
Name: "Update",
Params: []interface{}{"", "", time.Time{}},
Name: "CompareAndSwap",
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
},
},
},
// PUT with PrevExist=false *and* PrevIndex set ==> Create
// TODO(jonboulle): is this expected?!
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
Response{Event: &store.Event{}},
@ -1391,6 +1389,7 @@ type nopTransporter struct{}
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}

View File

@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
clusterMustProgress(t, c.Members[:1])
}
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
for _, m := range c.Members {
m.SnapCount = 10
}
c.Launch(t)
defer c.Terminate(t)
// force a snapshot
for i := 0; i < 20; i++ {
clusterMustProgress(t, c.Members)
}
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.
@ -526,7 +566,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{
@ -623,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
}
func mustNewTransport(t *testing.T) *http.Transport {
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}

View File

@ -0,0 +1,90 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"errors"
"os"
"syscall"
"time"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fname string
file *os.File
}
func (l *lock) Name() string {
return l.fname
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
f, err := os.Open(l.fname)
if err != nil {
return ErrLocked
}
l.file = f
return nil
}
// Lock acquires exclusivity on the lock with blocking
func (l *lock) Lock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
for {
f, err := os.Open(l.fname)
if err == nil {
l.file = f
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
return l.file.Close()
}
func (l *lock) Destroy() error {
return nil
}
func NewLock(file string) (Lock, error) {
l := &lock{fname: file}
return l, nil
}

View File

@ -0,0 +1,98 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build solaris
package fileutil
import (
"errors"
"os"
"syscall"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fd int
file *os.File
}
func (l *lock) Name() string {
return l.file.Name()
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Pid = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
// Lock acquires exclusivity on the lock without blocking
func (l *lock) Lock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_UNLCK
lock.Whence = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
func (l *lock) Destroy() error {
return l.file.Close()
}
func NewLock(file string) (Lock, error) {
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
l := &lock{int(f.Fd()), f}
return l, nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9
// +build !windows,!plan9,!solaris
package fileutil

View File

@ -23,7 +23,7 @@ import (
// NewTimeoutTransport returns a transport created using the given TLS info.
// If read/write on the created connection blocks longer than its time limit,
// it will return timeout error.
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
tr, err := NewTransport(info)
if err != nil {
return nil, err
@ -33,7 +33,7 @@ func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*ht
tr.MaxIdleConnsPerHost = -1
tr.Dial = (&rwTimeoutDialer{
Dialer: net.Dialer{
Timeout: 30 * time.Second,
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
},
rdtimeoutd: rdtimeoutd,

View File

@ -26,7 +26,7 @@ import (
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
// that can dial out timeout connections.
func TestNewTimeoutTransport(t *testing.T) {
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
if err != nil {
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
}

View File

@ -16,6 +16,7 @@ package proxy
import (
"log"
"math/rand"
"net/url"
"sync"
"time"
@ -65,6 +66,13 @@ func (d *director) refresh() {
}
endpoints = append(endpoints, newEndpoint(*uu))
}
// shuffle array to avoid connections being "stuck" to a single endpoint
for i := range endpoints {
j := rand.Intn(i + 1)
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
}
d.ep = endpoints
}

View File

@ -18,6 +18,17 @@ import (
"net/http"
)
const (
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
// maintained between proxy and each member. We set it to 128 to
// let proxy handle 128 concurrent requests in long term smoothly.
// If the number of concurrent requests is bigger than this value,
// proxy needs to create one new connection when handling each request in
// the delta, which is bad because the creation consumes resource and
// may eat up ephemeral ports.
DefaultMaxIdleConnsPerHost = 128
)
// GetProxyURLs is a function which should return the current set of URLs to
// which client requests should be proxied. This function will be queried
// periodically by the proxy Handler to refresh the set of available

View File

@ -15,8 +15,10 @@
package proxy
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
proxyreq := new(http.Request)
*proxyreq = *clientreq
var (
proxybody []byte
err error
)
if clientreq.Body != nil {
proxybody, err = ioutil.ReadAll(clientreq.Body)
if err != nil {
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
e.WriteTo(rw)
return
}
}
// deep-copy the headers, as these will be modified below
proxyreq.Header = make(http.Header)
copyHeader(proxyreq.Header, clientreq.Header)
@ -73,10 +90,31 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
return
}
completeCh := make(chan bool, 1)
closeNotifier, ok := rw.(http.CloseNotifier)
if ok {
go func() {
select {
case <-closeNotifier.CloseNotify():
tp, ok := p.transport.(*http.Transport)
if ok {
tp.CancelRequest(proxyreq)
}
case <-completeCh:
}
}()
defer func() {
completeCh <- true
}()
}
var res *http.Response
var err error
for _, ep := range endpoints {
if proxybody != nil {
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
}
redirectRequest(proxyreq, ep.URL)
res, err = p.transport.RoundTrip(proxyreq)

View File

@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
}
func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
}
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,

View File

@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
lead := None
prevSoftSt := r.softState()
prevHardSt := r.HardState
prevHardSt := emptyState
for {
if advancec != nil {

View File

@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 1}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
}
@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 3}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries,
}

View File

@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
from, err := types.IDFromString(fromStr)
if err != nil {
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
http.Error(w, "invalid path", http.StatusNotFound)
return
}

View File

@ -40,6 +40,7 @@ const (
appRespBatchMs = 50
propBatchMs = 10
DialTimeout = time.Second
ConnReadTimeout = 5 * time.Second
ConnWriteTimeout = 5 * time.Second
)
@ -199,7 +200,7 @@ func (p *peer) handle() {
log.Printf("sender: the connection with %s became inactive", p.id)
p.active = false
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
} else {
@ -208,7 +209,7 @@ func (p *peer) handle() {
p.active = true
p.errored = nil
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
}

42
rafthttp/remote.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/http"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
peer *peer
}
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
return &remote{
id: to,
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
}
}
func (g *remote) Send(m raftpb.Message) {
g.peer.send(m)
}
func (g *remote) Stop() {
g.peer.Stop()
}

View File

@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
// ignore lower-term streaming request
if sw.term < s.w.term {
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
} else if sw.term == s.w.term {
s.w.stopWithoutLog()
} else {
s.w.stop()
}
s.w.stop()
}
s.w = sw
return nil
@ -151,21 +154,23 @@ type WriteFlusher interface {
// TODO: replace fs with stream stats
type streamWriter struct {
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
printLog bool
}
// newStreamWriter starts and returns a new unstarted stream writer.
// The caller should call stop when finished, to shut it down.
func newStreamWriter(to types.ID, term uint64) *streamWriter {
s := &streamWriter{
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
printLog: true,
}
return s
}
@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
func (s *streamWriter) handle(w WriteFlusher) {
defer func() {
close(s.done)
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
if s.printLog {
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
}
}()
ew := newEntryWriter(w, s.to)
@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
<-s.done
}
func (s *streamWriter) stopWithoutLog() {
s.printLog = false
s.stop()
}
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
// TODO: move the raft interface out of the reader.

View File

@ -35,6 +35,12 @@ type Raft interface {
type Transporter interface {
Handler() http.Handler
Send(m []raftpb.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
AddRemote(id types.ID, urls []string)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
@ -50,9 +56,10 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
errorc chan error
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]*peer // peers map
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
raft: r,
serverStats: ss,
leaderStats: ls,
remotes: make(map[types.ID]*remote),
peers: make(map[types.ID]*peer),
errorc: errorc,
}
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
to := types.ID(m.To)
p, ok := t.peers[to]
if !ok {
log.Printf("etcdserver: send message to unknown receiver %s", to)
if ok {
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
}
p.Send(m)
continue
}
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
g, ok := t.remotes[to]
if ok {
g.Send(m)
continue
}
p.Send(m)
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
}
func (t *transport) Stop() {
for _, r := range t.remotes {
r.Stop()
}
for _, p := range t.peers {
p.Stop()
}
@ -113,6 +130,21 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.remotes[id]; ok {
return
}
peerURL := us[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, RaftPrefix)
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
}
func (t *transport) AddPeer(id types.ID, urls []string) {
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -82,7 +82,10 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
break
}
}
return snap, err
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {

View File

@ -76,7 +76,7 @@ func TestBadCRC(t *testing.T) {
// fake a crc mismatch
crcTable = crc32.MakeTable(crc32.Koopman)
_, err = ss.Load()
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
if err == nil || err != ErrCRCMismatch {
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
}
@ -182,7 +182,7 @@ func TestNoSnapshot(t *testing.T) {
defer os.RemoveAll(dir)
ss := New(dir)
_, err = ss.Load()
if err == nil || err != ErrNoSnapshot {
if err != ErrNoSnapshot {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}
@ -195,14 +195,35 @@ func TestEmptySnapshot(t *testing.T) {
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("shit"), 0x700)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
if err != nil {
t.Fatal(err)
}
_, err = Read(path.Join(dir, "1.snap"))
if err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
}
}
// TestAllSnapshotBroken ensures snapshotter returens
// ErrNoSnapshot if all the snapshots are broken.
func TestAllSnapshotBroken(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
if err != nil {
t.Fatal(err)
}
ss := New(dir)
_, err = ss.Load()
if err == nil || err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
if err != ErrNoSnapshot {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}

View File

@ -84,7 +84,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
if ok { // add the new watcher to the back of the list
elem = l.PushBack(w)
} else { // create a new list and add the new watcher
l = list.New()
elem = l.PushBack(w)
@ -146,6 +145,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
}

View File

@ -23,7 +23,10 @@ import (
)
var (
Version = "2.0.8"
Version = "2.0.13"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
)
// WalVersion is an enum for versions of etcd logs.

View File

@ -67,7 +67,7 @@ func checkWalNames(names []string) []string {
wnames := make([]string, 0)
for _, name := range names {
if _, _, err := parseWalName(name); err != nil {
log.Printf("wal: parse %s error: %v", name, err)
log.Printf("wal: ignored file %v in wal", name)
continue
}
wnames = append(wnames, name)

View File

@ -273,30 +273,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}
// Cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomtically rename temp wal file to a wal file.
func (w *WAL) Cut() error {
// create a new wal file with name sequence + 1
// close old wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
ftpath := fpath + ".tmp"
// create a temp wal file with name sequence + 1, or tuncate the existing one
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
if err = w.sync(); err != nil {
return err
}
w.f.Close()
// update writer and save the previous crc
w.f = f
w.seq++
w.f = ft
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
if err := w.saveCrc(prevCrc); err != nil {
@ -308,7 +306,45 @@ func (w *WAL) Cut() error {
if err := w.saveState(&w.state); err != nil {
return err
}
return w.sync()
// close temp wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
// atomically move temp wal file to wal file
if err := os.Rename(ftpath, fpath); err != nil {
return err
}
// open the wal file and update writer again
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return err
}
w.f = f
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
// lock the new wal file
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
// increase the wal seq
w.seq++
log.Printf("wal: segmented wal file %v is created", fpath)
return nil
}
func (w *WAL) sync() error {