Compare commits

...

55 Commits

Author SHA1 Message Date
0678329cd6 *: bump to v2.0.11 2015-05-15 13:54:32 -07:00
9a0e0c2eae etcdmain: better error msg when detected duplicate id in discovery
Conflicts:
	etcdmain/etcd.go
2015-05-15 13:47:02 -07:00
3e4d57c37d pkg/fileutil: add plan9 lockfile support 2015-05-15 13:35:51 -07:00
d30e764b2d version: added more version information
added more version information output to aid debugging
print etcd Version, Git SHA, Go runtime version, OS
and architecture

Fixes #2560

Conflicts:
	version/version.go
2015-05-15 12:34:33 -07:00
b5b7c78f1b docs: proxy needs accessible advertise client urls
Users cannot use proxy if -advertise-client-urls is set correctly.
Especially mention this in the doc to help them bypass the wrong
settings.
2015-05-15 12:32:58 -07:00
ee1c07c3d4 proxy: Fix connection leak when client disconnect
established connections were leaked when client disconnected before
proxyreq completes. This happens all time for wait=true requests.
2015-05-15 12:32:49 -07:00
67c5d4dfd2 etcdmain: advertise-client-urls must be set if listen-client-urls is set
Before this PR, people can set listen-client-urls without setting
advertise-client-urls, and leaves advertise-client-urls as default
localhost value. The client libraries which sync the cluster info
fetch wrong advertise-client-urls and cannot connect to the cluster.
This PR avoids this case and provides better UX.

On the other hand, this change is safe because people always want to set
advertise-client-urls if listen-client-urls is set. The default localhost
advertise url cannot be accessed from the outside, and should always be
set except that etcd is bootstrapped with no flag.

Conflicts:
	etcdmain/etcd.go
2015-05-15 12:32:35 -07:00
3afcbd6f83 docs: clarify the disaster recovery guide
A bit was missing from the documentation on disaster recovery, the reset
of the advertised peer urls for the node recovered from backup. Without
that, any subsequent server joining the cluster would not be able to
speak to the first node.
2015-05-15 12:30:48 -07:00
8fed61b2eb client: 410 is a vaild response for member.Remove
When removing a member, etcdserver might return 410 that indicates
the member has been removed. To client, 410 is a vaild response since
the client might do internal retry.
2015-05-15 12:30:37 -07:00
c8d386e18c pkg/fileutil: add filelock support for solaris 2015-05-15 11:30:40 -07:00
2b6a44b7b0 raft: fix typo in raftlog
fix typo in String() method of raftlog which will misorder
the "committed" and "unstable.offset" output.
2015-05-15 11:30:32 -07:00
8069d08b96 etcdserver: init server stats before passing it as argument
It is more reasonable to init the variable before passing it as an
argument.

It fixes a bug that etcdserver may panic on server stats when processing
a message from rafthttp streamReader before server stats is initialized
in server.Start().
2015-05-15 11:30:23 -07:00
58f035844c Merge pull request #2753 from yichengq/fix-remove-panic
backport #2701 to release-2.0 branch
2015-04-28 21:17:55 -07:00
f83774b4cd integration: add tests around the membership change issues 2015-04-24 13:49:17 -07:00
12c32137a8 rafthttp: add AddRemote
Add remotes to rafthttp, who help newly joined members catch up the
progress of the cluster. It supports basic message sending to remote, and
has no stream connection for simplicity. remotes will not be used
after the latest peers have been added into rafthttp.

Conflicts:
	rafthttp/pipeline.go
	rafthttp/transport.go
2015-04-24 13:37:16 -07:00
fce4cf4dc8 Revert "etcdserver: fix cluster fallback recovery"
This reverts commit cff005777a.
2015-04-24 13:06:43 -07:00
06a72b2702 *: bump to v2.0.10+git 2015-04-22 15:21:59 -07:00
fbaef05885 *: bump to v2.0.10 2015-04-22 15:21:38 -07:00
31a94d28e3 etcdctl: add extended as output format
extended wasn't documented in the help as one of the output formats, fix
this!

Conflicts:
	etcdctl/main.go
2015-04-22 15:11:06 -07:00
88660a303f snap: load should only return ErrNoSnapshot
If there is no available snapshot, load should return
ErrNoSnapshot. etcdserver might recover from that error
if it still have complete WAL files.
2015-04-22 15:09:38 -07:00
53c74dbd0b etcdserver: prevExist=true + condition is compareAndSwap
PrevExist indicates the key should exist. Condition compares with
an existing key. So PrevExist+condition = CompareAndSwap not Update.
2015-04-22 15:09:28 -07:00
8a8af60fad etcdctl: backup tool should use the new layout 2015-04-22 15:09:15 -07:00
7de19fefe8 etcdserver: fix minor bug in EtcdServer.send
it seems to nothing serious.
after deleted peers, the log may output:
"etcdserver: send message to unknown receiver %s"
2015-04-22 15:09:04 -07:00
7750f387b0 wal: better log msg 2015-04-22 15:08:50 -07:00
e33ab24442 wal: never leave a corrupted wal file
If the process dies during wal.cut(), it might leave a corrupted wal
file. This commit solves the problem by creating a temp wal file first,
then atomically rename it to a wal file when we are sure it is vaild.

Conflicts:
	wal/wal.go
2015-04-22 15:08:42 -07:00
fce2c1eeaf discovery: drop trailing . from srv target 2015-04-22 15:06:20 -07:00
6a3bb93305 discovery: add a test case for srv
During srv discovery, it should try to match local member with
resolved addr and return unresolved hostnames for the cluster.

Conflicts:
	discovery/srv_test.go
2015-04-22 15:06:03 -07:00
21455d2f3b *: stop using resolved tcp addr
We start to resolve host into tcp addrs since we generate
tcp based initial-cluster during srv discovery. However it
creates problems around tls and cluster verification. The
srv discovery only needs to use resolved the tcp addr to
find the local node. It does not have to resolve everything
and use the resolved addrs.

This fixes #2488 and #2226
2015-04-22 14:59:07 -07:00
51bb4220c5 Clarify that it is the proxy doing the shuffle. 2015-04-22 14:58:54 -07:00
d8c506923f proxy: shuffle endpoints
Shuffle endpoitns to avoid being "stuck" to a single cluster member.
2015-04-22 14:58:40 -07:00
5d778f85ca *: bump to v2.0.9+git 2015-04-07 15:18:50 -07:00
02697ca725 *: bump to v2.0.9 2015-04-07 15:18:29 -07:00
bd693c7069 etcdctl: refactor message in import command 2015-04-07 15:16:13 -07:00
52c90cdcfb etcdctl: import hidden keys 2015-04-07 14:49:40 -07:00
a88b22ac0a store: fix watcher removal 2015-04-07 14:46:10 -07:00
e93f8b8a12 *: bump to v2.0.8+git 2015-03-31 14:29:38 -07:00
86e616c6e9 *: bump to v2.0.8 2015-03-31 14:29:13 -07:00
5ae55a2c0d etcdctl: fix import typos 2015-03-31 13:48:18 -07:00
62ce6eef7b etcdctl: main routine of import command should wait for goroutine existing 2015-03-31 13:26:15 -07:00
7df4f5c804 build: do not build internal debugging tool
We are still playing around with the dump-log tool.
Stop building it publicly until we are happy with its
ux and functionality.
2015-03-31 13:26:05 -07:00
461c24e899 etcdct: adopt new client port by default
etcdserver uses both 4001 and 2379 for serving client requests by
default. etcdctl supports both ports by default.
2015-03-31 13:25:56 -07:00
6d90d03bf0 etcdctl: add migratesnap command 2015-03-31 13:25:39 -07:00
9995e80a2c Revert "etcdhttp: add internalVersion"
This reverts commit a77bf97c14.

Conflicts:
	version/version.go

Conflicts:
	version/version.go
2015-03-31 13:25:22 -07:00
229405f113 *: remove upgrading related stuff 2015-03-31 13:24:28 -07:00
b3f2a998d4 docs: add clarity about the 1000 events history
When talking about missing events on a particular key, the 1000 event history
limit can be understood as being per key, instead of etcd-wide events. Make it
clear that it is across all etcd keys.
2015-03-31 13:24:19 -07:00
8436e901e9 etcdserver: loose member validation for joining existing cluster 2015-03-31 13:24:07 -07:00
c03f5cb941 *: bump to v2.0.7+git 2015-03-24 23:14:38 -07:00
0cb90e4bea *: bump to v2.0.7 2015-03-24 23:07:57 -07:00
df83b1b34e wal: fix missing import 2015-03-24 23:00:04 -07:00
f2bef04009 wal: releastTo should work with large release index 2015-03-24 22:51:02 -07:00
02198336f6 version: not return err NotExist in Detect 2015-03-24 22:50:44 -07:00
0c9a226e0e etcdserver: print out extra files in data dir instead of erroring 2015-03-24 22:50:33 -07:00
5bd1d420bb etcdserver: add join-existing check 2015-03-24 22:49:41 -07:00
a1cb5cb768 etcdmain: print error when non-flag args remain 2015-03-24 22:49:31 -07:00
acba49fe81 *: bump to v2.0.6+git 2015-03-23 14:05:08 -07:00
49 changed files with 905 additions and 486 deletions

View File

@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
### Member Migration
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
#### Copy the data directory of the now-idle member to the new machine
```
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
```
```
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
#### Restoring the cluster
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
### Client Request Timeout

View File

@ -1,120 +0,0 @@
## Allow-legacy mode
Allow-legacy is a special mode in etcd that contains logic to enable a running etcd cluster to smoothly transition between major versions of etcd. For example, the internal API versions between etcd 0.4 (internal v1) and etcd 2.0 (internal v2) aren't compatible and the cluster needs to be updated all at once to make the switch. To minimize downtime, allow-legacy coordinates with all of the members of the cluster to shutdown, migration of data and restart onto the new version.
Allow-legacy helps users upgrade v0.4 etcd clusters easily, and allows your etcd cluster to have a minimal amount of downtime -- less than 1 minute for clusters storing less than 50 MB.
It supports upgrading from internal v1 to internal v2 now.
### Setup
This mode is enabled if `ETCD_ALLOW_LEGACY_MODE` is set to true, or etcd is running in CoreOS system.
It treats `ETCD_BINARY_DIR` as the directory for etcd binaries, which is organized in this way:
```
ETCD_BINARY_DIR
|
-- 1
|
-- 2
```
`1` is etcd with internal v1 protocol. You should use etcd v0.4.7 here. `2` is etcd with internal v2 protocol, which is etcd v2.x.
The default value for `ETCD_BINARY_DIR` is `/usr/libexec/etcd/internal_versions/`.
### Upgrading a Cluster
When starting etcd with a v1 data directory and v1 flags, etcd executes the v0.4.7 binary and runs exactly the same as before. To start the migration, follow the steps below:
![Migration Steps](etcd-migration-steps.png)
#### 1. Check the Cluster Health
Before upgrading, you should check the health of the cluster to double check that everything working perfectly. Check the health by running:
```
$ etcdctl cluster-health
cluster is healthy
member 6e3bd23ae5f1eae0 is healthy
member 924e2e83e93f2560 is healthy
member a8266ecf031671f3 is healthy
```
If the cluster and all members are healthy, you can start the upgrading process. If not, check the unhealthy machines and repair them using [admin guide](./admin_guide.md).
#### 2. Trigger the Upgrade
When you're ready, use the `etcdctl upgrade` command to start the upgrade the etcd cluster to 2.0:
```
# Defaults work on a CoreOS machine running etcd
$ etcdctl upgrade
```
```
# Advanced example specifying a peer url
$ etcdctl upgrade --old-version=1 --new-version=2 --peer-url=$PEER_URL
```
`PEER_URL` can be any accessible peer url of the cluster.
Once triggered, all peer-mode members will print out:
```
detected next internal version 2, exit after 10 seconds.
```
#### Parallel Coordinated Upgrade
As part of the upgrade, etcd does internal coordination within the cluster for a brief period and then exits. Clusters storing 50 MB should be unavailable for less than 1 minute.
#### Restart etcd Processes
After the etcd processes exit, they need to be restarted. You can do this manually or configure your unit system to do this automatically. On CoreOS, etcd is already configured to start automatically with systemd.
When restarted, the data directory of each member is upgraded, and afterwards etcd v2.0 will be running and servicing requests. The upgrade is now complete!
Standby-mode members are a special case — they will be upgraded into proxy mode (a new feature in etcd 2.0) upon restarting. When the upgrade is triggered, any standbys will exit with the message:
```
Detect the cluster has been upgraded to internal API v2. Exit now.
```
Once restarted, standbys run in v2.0 proxy mode, which proxy user requests to the etcd cluster.
#### 3. Check the Cluster Health
After the upgrade process, you can run the health check again to verify the upgrade. If the cluster is unhealthy or there is an unhealthy member, please refer to start [failure recovery](#failure-recovery).
### Downgrade
If the upgrading fails due to disk/network issues, you still can restart the upgrading process manually. However, once you upgrade etcd to internal v2 protocol, you CANNOT downgrade it back to internal v1 protocol. If you want to downgrade etcd in the future, please backup your v1 data dir beforehand.
### Upgrade Process on CoreOS
When running on a CoreOS system, allow-legacy mode is enabled by default and an automatic update will set up everything needed to execute the upgrade. The `etcd.service` on CoreOS is already configured to restart automatically. All you need to do is run `etcdctl upgrade` when you're ready, as described
### Internal Details
etcd v0.4.7 registers versions of available etcd binaries in its local machine into the key space at bootstrap stage. When the upgrade command is executed, etcdctl checks whether each member has internal-version-v2 etcd binary around. If that is true, each member is asked to record the fact that it needs to be upgraded the next time it reboots, and exits after 10 seconds.
Once restarted, etcd v2.0 sees the upgrade flag recorded. It upgrades the data directory, and executes etcd v2.0.
### Failure Recovery
If `etcdctl cluster-health` says that the cluster is unhealthy, the upgrade process fails, which may happen if the network is broken, or the disk cannot work.
The way to recover it is to manually upgrade the whole cluster to v2.0:
- Log into machines that ran v0.4 peer-mode etcd
- Stop all etcd services
- Remove the `member` directory under the etcd data-dir
- Start etcd service using [2.0 flags](configuration.md). An example for this is:
```
$ etcd --data-dir=$DATA_DIR --listen-peer-urls http://$LISTEN_PEER_ADDR \
--advertise-client-urls http://$ADVERTISE_CLIENT_ADDR \
--listen-client-urls http://$LISTEN_CLIENT_ADDR
```
- When this is done, v2.0 etcd cluster should work now.

View File

@ -287,7 +287,7 @@ curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=7'
The watch command returns immediately with the same response as previously.
**Note**: etcd only keeps the responses of the most recent 1000 events.
**Note**: etcd only keeps the responses of the most recent 1000 events across all etcd keys.
It is recommended to send the response to another thread to process immediately
instead of blocking the watch while processing the result.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.9 KiB

View File

@ -4,6 +4,10 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
### Using an etcd proxy
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).

8
build
View File

@ -11,8 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
eval $(go env)
GIT_SHA=`git rev-parse --short HEAD`
# Static compilation is useful when etcd is run in a container
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl

View File

@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
return err
}
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
}
type membersAPIActionList struct{}

View File

@ -25,7 +25,8 @@ import (
var (
// indirection for testing
lookupSRV = net.LookupSRV
lookupSRV = net.LookupSRV
resolveTCPAddr = net.ResolveTCPAddr
)
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
@ -38,7 +39,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
// First, resolve the apurls
for _, url := range apurls {
tcpAddr, err := net.ResolveTCPAddr("tcp", url.Host)
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
if err != nil {
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
return "", "", err
@ -52,8 +53,9 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
return err
}
for _, srv := range addrs {
host := net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port))
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
target := strings.TrimSuffix(srv.Target, ".")
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
tcpAddr, err := resolveTCPAddr("tcp", host)
if err != nil {
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
continue
@ -68,8 +70,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
n = fmt.Sprintf("%d", tempName)
tempName += 1
}
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, tcpAddr.String()))
log.Printf("discovery: Got bootstrap from DNS for %s at host %s to %s%s", service, host, prefix, tcpAddr.String())
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
log.Printf("discovery: Got bootstrap from DNS for %s at %s%s", service, prefix, host)
}
return nil
}

View File

@ -23,19 +23,26 @@ import (
)
func TestSRVGetCluster(t *testing.T) {
defer func() { lookupSRV = net.LookupSRV }()
defer func() {
lookupSRV = net.LookupSRV
resolveTCPAddr = net.ResolveTCPAddr
}()
name := "dnsClusterTest"
tests := []struct {
withSSL []*net.SRV
withoutSSL []*net.SRV
urls []string
expected string
dns map[string]string
expected string
}{
{
[]*net.SRV{},
[]*net.SRV{},
nil,
nil,
"",
},
{
@ -46,6 +53,8 @@ func TestSRVGetCluster(t *testing.T) {
},
[]*net.SRV{},
nil,
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
},
{
@ -58,6 +67,7 @@ func TestSRVGetCluster(t *testing.T) {
&net.SRV{Target: "10.0.0.1", Port: 7001},
},
nil,
nil,
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
},
{
@ -70,8 +80,22 @@ func TestSRVGetCluster(t *testing.T) {
&net.SRV{Target: "10.0.0.1", Port: 7001},
},
[]string{"https://10.0.0.1:2480"},
nil,
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
},
// matching local member with resolved addr and return unresolved hostnames
{
[]*net.SRV{
&net.SRV{Target: "1.example.com.", Port: 2480},
&net.SRV{Target: "2.example.com.", Port: 2480},
&net.SRV{Target: "3.example.com.", Port: 2480},
},
nil,
[]string{"https://10.0.0.1:2480"},
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
},
}
for i, tt := range tests {
@ -84,6 +108,12 @@ func TestSRVGetCluster(t *testing.T) {
}
return "", nil, errors.New("Unkown service in mock")
}
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
if tt.dns == nil || tt.dns[addr] == "" {
return net.ResolveTCPAddr(network, addr)
}
return net.ResolveTCPAddr(network, tt.dns[addr])
}
urls := testutil.MustNewURLs(t, tt.urls)
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
if err != nil {

View File

@ -44,10 +44,10 @@ func NewBackupCommand() cli.Command {
// handleBackup handles a request that intends to do a backup.
func handleBackup(c *cli.Context) {
srcSnap := path.Join(c.String("data-dir"), "snap")
destSnap := path.Join(c.String("backup-dir"), "snap")
srcWAL := path.Join(c.String("data-dir"), "wal")
destWAL := path.Join(c.String("backup-dir"), "wal")
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
if err := os.MkdirAll(destSnap, 0700); err != nil {
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)

View File

@ -0,0 +1,128 @@
package command
import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/store"
)
type set struct {
key string
value string
ttl int64
}
func NewImportSnapCommand() cli.Command {
return cli.Command{
Name: "import",
Usage: "import a snapshot to a cluster",
Flags: []cli.Flag{
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
},
Action: handleImportSnap,
}
}
func handleImportSnap(c *cli.Context) {
d, err := ioutil.ReadFile(c.String("snap"))
if err != nil {
if c.String("snap") == "" {
fmt.Printf("no snapshot file provided (use --snap)\n")
} else {
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
}
os.Exit(1)
}
st := store.New()
err = st.Recovery(d)
if err != nil {
fmt.Printf("cannot recover the snapshot file: %v\n", err)
os.Exit(1)
}
endpoints, err := getEndpoints(c)
if err != nil {
handleError(ErrorFromEtcd, err)
}
tr, err := getTransport(c)
if err != nil {
handleError(ErrorFromEtcd, err)
}
wg := &sync.WaitGroup{}
setc := make(chan set)
concurrent := c.Int("c")
fmt.Printf("starting to import snapshot %s with %d clients\n", c.String("snap"), concurrent)
for i := 0; i < concurrent; i++ {
client := etcd.NewClient(endpoints)
client.SetTransport(tr)
if c.GlobalBool("debug") {
go dumpCURL(client)
}
if ok := client.SyncCluster(); !ok {
handleError(FailedToConnectToHost, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
}
wg.Add(1)
go runSet(client, setc, wg)
}
all, err := st.Get("/", true, true)
if err != nil {
handleError(ErrorFromEtcd, err)
}
n := copyKeys(all.Node, setc)
hiddens := c.StringSlice("hidden")
for _, h := range hiddens {
allh, err := st.Get(h, true, true)
if err != nil {
handleError(ErrorFromEtcd, err)
}
n += copyKeys(allh.Node, setc)
}
close(setc)
wg.Wait()
fmt.Printf("finished importing %d keys\n", n)
}
func copyKeys(n *store.NodeExtern, setc chan set) int {
num := 0
if !n.Dir {
setc <- set{n.Key, *n.Value, n.TTL}
return 1
}
log.Println("entering dir:", n.Key)
for _, nn := range n.Nodes {
sub := copyKeys(nn, setc)
num += sub
}
return num
}
func runSet(c *etcd.Client, setc chan set, wg *sync.WaitGroup) {
for s := range setc {
log.Println("copying key:", s.key)
if s.ttl != 0 && s.ttl < 300 {
log.Printf("extending key %s's ttl to 300 seconds", s.key)
s.ttl = 5 * 60
}
_, err := c.Set(s.key, s.value, uint64(s.ttl))
if err != nil {
log.Fatalf("failed to copy key: %v\n", err)
}
}
wg.Done()
}

View File

@ -1,78 +0,0 @@
/*
Copyright 2015 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package command
import (
"fmt"
"log"
"net/http"
"os"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/pkg/transport"
)
func UpgradeCommand() cli.Command {
return cli.Command{
Name: "upgrade",
Usage: "upgrade an old version etcd cluster to a new version",
Flags: []cli.Flag{
cli.StringFlag{Name: "old-version", Value: "1", Usage: "Old internal version"},
cli.StringFlag{Name: "new-version", Value: "2", Usage: "New internal version"},
cli.StringFlag{Name: "peer-url", Value: "http://localhost:7001", Usage: "An etcd peer url string"},
cli.StringFlag{Name: "peer-cert-file", Value: "", Usage: "identify HTTPS peer using this SSL certificate file"},
cli.StringFlag{Name: "peer-key-file", Value: "", Usage: "identify HTTPS peer using this SSL key file"},
cli.StringFlag{Name: "peer-ca-file", Value: "", Usage: "verify certificates of HTTPS-enabled peers using this CA bundle"},
},
Action: handleUpgrade,
}
}
func handleUpgrade(c *cli.Context) {
if c.String("old-version") != "1" {
fmt.Printf("Do not support upgrade from version %s\n", c.String("old-version"))
os.Exit(1)
}
if c.String("new-version") != "2" {
fmt.Printf("Do not support upgrade to version %s\n", c.String("new-version"))
os.Exit(1)
}
tls := transport.TLSInfo{
CAFile: c.String("peer-ca-file"),
CertFile: c.String("peer-cert-file"),
KeyFile: c.String("peer-key-file"),
}
t, err := transport.NewTransport(tls)
if err != nil {
log.Fatal(err)
}
client := http.Client{Transport: t}
resp, err := client.Get(c.String("peer-url") + "/v2/admin/next-internal-version")
if err != nil {
fmt.Printf("Failed to send upgrade request to %s: %v\n", c.String("peer-url"), err)
return
}
if resp.StatusCode == http.StatusOK {
fmt.Println("Cluster will start upgrading from internal version 1 to 2 in 10 seconds.")
return
}
if resp.StatusCode == http.StatusNotFound {
fmt.Println("Cluster cannot upgrade to 2: version is not 0.4.7")
return
}
fmt.Printf("Faild to send upgrade request to %s: bad status code %d\n", c.String("cluster-url"), resp.StatusCode)
}

View File

@ -65,7 +65,7 @@ func getPeersFlagValue(c *cli.Context) []string {
// If we still don't have peers, use a default
if peerstr == "" {
peerstr = "127.0.0.1:4001"
peerstr = "127.0.0.1:4001,127.0.0.1:2379"
}
return strings.Split(peerstr, ",")

View File

@ -31,7 +31,7 @@ func main() {
app.Flags = []cli.Flag{
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple` or `json`)"},
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple`, `extended` or `json`)"},
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},
@ -53,7 +53,7 @@ func main() {
command.NewWatchCommand(),
command.NewExecWatchCommand(),
command.NewMemberCommand(),
command.UpgradeCommand(),
command.NewImportSnapCommand(),
}
app.Run(os.Args)

View File

@ -15,18 +15,17 @@
package etcdmain
import (
"errors"
"flag"
"fmt"
"log"
"net/url"
"os"
"runtime"
"strings"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/version"
)
@ -64,6 +63,7 @@ var (
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
)
type config struct {
@ -208,9 +208,15 @@ func (cfg *config) Parse(arguments []string) error {
default:
os.Exit(2)
}
if len(cfg.FlagSet.Args()) != 0 {
return fmt.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0))
}
if cfg.printVersion {
fmt.Println("etcd version", version.Version)
fmt.Printf("etcd Version: %s\n", version.Version)
fmt.Printf("Git SHA: %s\n", version.GitSHA)
fmt.Printf("Go Version: %s\n", runtime.Version())
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
os.Exit(0)
}
@ -252,9 +258,8 @@ func (cfg *config) Parse(arguments []string) error {
if err != nil {
return err
}
if err := cfg.resolveUrls(); err != nil {
return errors.New("cannot resolve DNS hostnames.")
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
}
if 5*cfg.TickMs > cfg.ElectionMs {
@ -272,10 +277,6 @@ func initialClusterFromName(name string) string {
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
}
func (cfg *config) resolveUrls() error {
return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls)
}
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }

View File

@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-snapshot-count=10",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
// it should be set if -listen-client-urls is set
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
}
wcfg := &config{
dir: "testdir",

View File

@ -56,8 +56,12 @@ func Main() {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
if err != nil {
log.Printf("etcd: error verifying flags, %v", err)
os.Exit(2)
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
switch err {
case errUnsetAdvertiseClientURLsFlag:
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
os.Exit(1)
}
var stopped <-chan struct{}
@ -90,8 +94,10 @@ func Main() {
if err != nil {
switch err {
case discovery.ErrDuplicateID:
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
cfg.name, cfg.durl, cfg.dir)
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
log.Printf("or use a new discovery token if the previous bootstrap failed.")
default:
log.Fatalf("etcd: %v", err)
}
@ -191,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -56,7 +56,8 @@ clustering flags:
--initial-cluster-token 'etcd-cluster'
initial cluster token for the etcd cluster during bootstrap.
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
list of this member's client URLs to advertise to the rest of the cluster.
list of this member's client URLs to advertise to the public.
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
--discovery ''
discovery URL used to bootstrap the cluster.
--discovery-fallback 'proxy'

View File

@ -59,12 +59,6 @@ type Cluster struct {
id types.ID
token string
store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map
members map[types.ID]*Member
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
}

View File

@ -21,7 +21,6 @@ import (
"log"
"net/http"
"sort"
"strconv"
"time"
"github.com/coreos/etcd/pkg/types"
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
return NewClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}

View File

@ -46,9 +46,43 @@ type ServerConfig struct {
ElectionTicks int
}
// VerifyBootstrapConfig sanity-checks the initial config and returns an error
// for things that should never happen.
func (c *ServerConfig) VerifyBootstrapConfig() error {
// VerifyBootstrapConfig sanity-checks the initial config for bootstrap case
// and returns an error for things that should never happen.
func (c *ServerConfig) VerifyBootstrap() error {
if err := c.verifyLocalMember(true); err != nil {
return err
}
if err := c.Cluster.Validate(); err != nil {
return err
}
if c.Cluster.String() == "" && c.DiscoveryURL == "" {
return fmt.Errorf("initial cluster unset and no discovery URL found")
}
return nil
}
// VerifyJoinExisting sanity-checks the initial config for join existing cluster
// case and returns an error for things that should never happen.
func (c *ServerConfig) VerifyJoinExisting() error {
// no need for strict checking since the member have announced its
// peer urls to the cluster before starting and do not have to set
// it in the configuration again.
if err := c.verifyLocalMember(false); err != nil {
return err
}
if err := c.Cluster.Validate(); err != nil {
return err
}
if c.DiscoveryURL != "" {
return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
}
return nil
}
// verifyLocalMember verifies the configured member is in configured
// cluster. If strict is set, it also verifies the configured member
// has the same peer urls as configured advertised peer urls.
func (c *ServerConfig) verifyLocalMember(strict bool) error {
m := c.Cluster.MemberByName(c.Name)
// Make sure the cluster at least contains the local server.
if m == nil {
@ -58,20 +92,14 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
return fmt.Errorf("cannot use %x as member id", raft.None)
}
if c.DiscoveryURL == "" && !c.NewCluster {
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
}
if err := c.Cluster.Validate(); err != nil {
return err
}
// Advertised peer URLs must match those in the cluster peer list
// TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
apurls := c.PeerURLs.StringSlice()
sort.Strings(apurls)
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
if strict {
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
}
}
return nil
}

View File

@ -22,6 +22,9 @@ import (
)
func mustNewURLs(t *testing.T, urls []string) []url.URL {
if len(urls) == 0 {
return nil
}
u, err := types.NewURLs(urls)
if err != nil {
t.Fatalf("error creating new URLs from %q: %v", urls, err)
@ -29,77 +32,101 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL {
return u
}
func TestBootstrapConfigVerify(t *testing.T) {
func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
cluster, err := NewClusterFromString("", "")
if err != nil {
t.Fatalf("NewClusterFromString error: %v", err)
}
c := &ServerConfig{
Name: "node1",
DiscoveryURL: "",
Cluster: cluster,
}
if err := c.VerifyBootstrap(); err == nil {
t.Errorf("err = nil, want not nil")
}
}
func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
cluster, err := NewClusterFromString("", "node1=http://127.0.0.1:2380")
if err != nil {
t.Fatalf("NewClusterFromString error: %v", err)
}
c := &ServerConfig{
Name: "node1",
DiscoveryURL: "http://127.0.0.1:4001/abcdefg",
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
Cluster: cluster,
NewCluster: false,
}
if err := c.VerifyJoinExisting(); err == nil {
t.Errorf("err = nil, want not nil")
}
}
func TestConfigVerifyLocalMember(t *testing.T) {
tests := []struct {
clusterSetting string
newclst bool
apurls []string
disc string
strict bool
shouldError bool
}{
{
// Node must exist in cluster
"",
true,
nil,
"",
true,
true,
},
{
// Cannot have duplicate URLs in cluster config
"node1=http://localhost:7001,node2=http://localhost:7001,node2=http://localhost:7002",
true,
nil,
"",
true,
},
{
// Node defined, ClusterState OK
// Initial cluster set
"node1=http://localhost:7001,node2=http://localhost:7002",
true,
[]string{"http://localhost:7001"},
"",
true,
false,
},
{
// Node defined, discovery OK
"node1=http://localhost:7001",
false,
[]string{"http://localhost:7001"},
"http://discovery",
false,
},
{
// Cannot have ClusterState!=new && !discovery
"node1=http://localhost:7001",
false,
nil,
"",
// Default initial cluster
"node1=http://localhost:2380,node1=http://localhost:7001",
[]string{"http://localhost:2380", "http://localhost:7001"},
true,
false,
},
{
// Advertised peer URLs must match those in cluster-state
"node1=http://localhost:7001",
true,
[]string{"http://localhost:12345"},
"",
true,
true,
},
{
// Advertised peer URLs must match those in cluster-state
"node1=http://localhost:7001,node1=http://localhost:12345",
true,
[]string{"http://localhost:12345"},
"",
true,
true,
},
{
// Advertised peer URLs must match those in cluster-state
"node1=http://localhost:7001",
[]string{},
true,
true,
},
{
// do not care about the urls if strict is not set
"node1=http://localhost:7001",
[]string{},
false,
false,
},
}
for i, tt := range tests {
@ -108,15 +135,13 @@ func TestBootstrapConfigVerify(t *testing.T) {
t.Fatalf("#%d: Got unexpected error: %v", i, err)
}
cfg := ServerConfig{
Name: "node1",
DiscoveryURL: tt.disc,
Cluster: cluster,
NewCluster: tt.newclst,
Name: "node1",
Cluster: cluster,
}
if tt.apurls != nil {
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
}
err = cfg.VerifyBootstrapConfig()
err = cfg.verifyLocalMember(tt.strict)
if (err == nil) && tt.shouldError {
t.Errorf("%#v", *cluster)
t.Errorf("#%d: Got no error where one was expected", i)

View File

@ -119,7 +119,6 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, err)
return
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
@ -334,7 +333,7 @@ func serveVersion(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
fmt.Fprintf(w, `{"releaseVersion":"%s","internalVersion":"%s"}`, version.Version, version.InternalVersion)
w.Write([]byte("etcd " + version.Version))
}
// parseKeyRequest converts a received http.Request on keysPrefix to

View File

@ -1327,7 +1327,7 @@ func TestServeVersion(t *testing.T) {
if rw.Code != http.StatusOK {
t.Errorf("code=%d, want %d", rw.Code, http.StatusOK)
}
w := fmt.Sprintf(`{"releaseVersion":"%s","internalVersion":"%s"}`, version.Version, version.InternalVersion)
w := fmt.Sprintf("etcd %s", version.Version)
if g := rw.Body.String(); g != w {
t.Fatalf("body = %q, want %q", g, w)
}

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp"
@ -29,10 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
timer: timer,
}
mux := http.NewServeMux()
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
ph := NewPeerHandler(&fakeCluster{}, h)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
h := &peerMembersHandler{clusterInfo: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -42,6 +42,7 @@ import (
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@ -145,18 +146,24 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var s *raft.MemoryStorage
var id types.ID
walVersion, err := wal.DetectVersion(cfg.DataDir)
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
if err != nil {
return nil, err
}
if walVersion == wal.WALUnknown {
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
if err := upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil {
return nil, err
}
haveWAL := walVersion != wal.WALNotExist
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
var remotes []*Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport)
if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
@ -164,13 +171,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
cfg.Cluster.UpdateIndex(existingCluster.index)
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
id, n, s, w = startNode(cfg, nil)
case !haveWAL && cfg.NewCluster:
if err := cfg.VerifyBootstrapConfig(); err != nil {
if err := cfg.VerifyBootstrap(); err != nil {
return nil, err
}
m := cfg.Cluster.MemberByName(cfg.Name)
@ -193,11 +200,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
case haveWAL:
// Run the migrations.
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
return nil, err
}
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err)
}
@ -237,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
Name: cfg.Name,
ID: id.String(),
}
sstats.Initialize()
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
@ -259,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
@ -291,7 +300,6 @@ func (s *EtcdServer) start() {
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -394,19 +402,15 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index
@ -670,9 +674,9 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
}
func (s *EtcdServer) send(ms []raftpb.Message) {
for _, m := range ms {
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
m.To = 0
for i, _ := range ms {
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
ms[i].To = 0
}
}
s.r.transport.Send(ms)
@ -722,7 +726,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
switch {
case existsSet:
if exists {
return f(s.store.Update(r.Path, r.Val, expr))
if r.PrevIndex == 0 && r.PrevValue == "" {
return f(s.store.Update(r.Path, r.Val, expr))
} else {
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
}
}
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
case r.PrevIndex > 0 || r.PrevValue != "":

View File

@ -235,20 +235,18 @@ func TestApplyRequest(t *testing.T) {
},
},
},
// PUT with PrevExist=true *and* PrevIndex set ==> Update
// TODO(jonboulle): is this expected?!
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
Response{Event: &store.Event{}},
[]testutil.Action{
{
Name: "Update",
Params: []interface{}{"", "", time.Time{}},
Name: "CompareAndSwap",
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
},
},
},
// PUT with PrevExist=false *and* PrevIndex set ==> Create
// TODO(jonboulle): is this expected?!
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
Response{Event: &store.Event{}},
@ -1391,6 +1389,7 @@ type nopTransporter struct{}
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
@ -93,9 +94,9 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
// upgradeWAL converts an older version of the etcdServer data to the newest version.
// It must ensure that, after upgrading, the most recent version is present.
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
switch ver {
case wal.WALv0_4:
case version.DataDir0_4:
log.Print("etcdserver: converting v0.4 log to v2.0")
err := migrate.Migrate4To2(baseDataDir, name)
if err != nil {
@ -103,16 +104,16 @@ func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
return err
}
fallthrough
case wal.WALv2_0:
case version.DataDir2_0:
err := makeMemberDir(baseDataDir)
if err != nil {
return err
}
fallthrough
case wal.WALv2_0_1:
case version.DataDir2_0_1:
fallthrough
default:
log.Printf("datadir is valid for the 2.0.1 format")
log.Printf("etcdserver: datadir is valid for the 2.0.1 format")
}
return nil
}

View File

@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
clusterMustProgress(t, c.Members[:1])
}
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
defer afterTest(t)
c := NewCluster(t, 5)
for _, m := range c.Members {
m.SnapCount = 10
}
c.Launch(t)
defer c.Terminate(t)
// force a snapshot
for i := 0; i < 20; i++ {
clusterMustProgress(t, c.Members)
}
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)
c.waitLeader(t, c.Members)
clusterMustProgress(t, c.Members)
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.
@ -526,7 +566,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{

View File

@ -0,0 +1,90 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"errors"
"os"
"syscall"
"time"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fname string
file *os.File
}
func (l *lock) Name() string {
return l.fname
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
f, err := os.Open(l.fname)
if err != nil {
return ErrLocked
}
l.file = f
return nil
}
// Lock acquires exclusivity on the lock with blocking
func (l *lock) Lock() error {
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
if err != nil {
return err
}
for {
f, err := os.Open(l.fname)
if err == nil {
l.file = f
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
return l.file.Close()
}
func (l *lock) Destroy() error {
return nil
}
func NewLock(file string) (Lock, error) {
l := &lock{fname: file}
return l, nil
}

View File

@ -0,0 +1,98 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build solaris
package fileutil
import (
"errors"
"os"
"syscall"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fd int
file *os.File
}
func (l *lock) Name() string {
return l.file.Name()
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Pid = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
// Lock acquires exclusivity on the lock without blocking
func (l *lock) Lock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_WRLCK
lock.Whence = 0
lock.Pid = 0
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Type = syscall.F_UNLCK
lock.Whence = 0
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
if err != nil && err == syscall.EAGAIN {
return ErrLocked
}
return err
}
func (l *lock) Destroy() error {
return l.file.Close()
}
func NewLock(file string) (Lock, error) {
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
l := &lock{int(f.Fd()), f}
return l, nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9
// +build !windows,!plan9,!solaris
package fileutil

View File

@ -16,6 +16,7 @@ package proxy
import (
"log"
"math/rand"
"net/url"
"sync"
"time"
@ -65,6 +66,13 @@ func (d *director) refresh() {
}
endpoints = append(endpoints, newEndpoint(*uu))
}
// shuffle array to avoid connections being "stuck" to a single endpoint
for i := range endpoints {
j := rand.Intn(i + 1)
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
}
d.ep = endpoints
}

View File

@ -73,6 +73,25 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
return
}
completeCh := make(chan bool, 1)
closeNotifier, ok := rw.(http.CloseNotifier)
if ok {
go func() {
select {
case <-closeNotifier.CloseNotify():
tp, ok := p.transport.(*http.Transport)
if ok {
tp.CancelRequest(proxyreq)
}
case <-completeCh:
}
}()
defer func() {
completeCh <- true
}()
}
var res *http.Response
var err error

View File

@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
}
func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
}
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,

View File

@ -199,7 +199,7 @@ func (p *peer) handle() {
log.Printf("sender: the connection with %s became inactive", p.id)
p.active = false
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Fail()
}
} else {
@ -208,7 +208,7 @@ func (p *peer) handle() {
p.active = true
p.errored = nil
}
if m.Type == raftpb.MsgApp {
if m.Type == raftpb.MsgApp && p.fs != nil {
p.fs.Succ(end.Sub(start))
}
}

42
rafthttp/remote.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"net/http"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
type remote struct {
id types.ID
peer *peer
}
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
return &remote{
id: to,
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
}
}
func (g *remote) Send(m raftpb.Message) {
g.peer.send(m)
}
func (g *remote) Stop() {
g.peer.Stop()
}

View File

@ -35,6 +35,12 @@ type Raft interface {
type Transporter interface {
Handler() http.Handler
Send(m []raftpb.Message)
// AddRemote adds a remote with given peer urls into the transport.
// A remote helps newly joined member to catch up the progress of cluster,
// and will not be used after that.
// It is the caller's responsibility to ensure the urls are all vaild,
// or it panics.
AddRemote(id types.ID, urls []string)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
@ -50,9 +56,10 @@ type transport struct {
serverStats *stats.ServerStats
leaderStats *stats.LeaderStats
mu sync.RWMutex // protect the peer map
peers map[types.ID]*peer // remote peers
errorc chan error
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]*peer // peers map
errorc chan error
}
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
raft: r,
serverStats: ss,
leaderStats: ls,
remotes: make(map[types.ID]*remote),
peers: make(map[types.ID]*peer),
errorc: errorc,
}
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
to := types.ID(m.To)
p, ok := t.peers[to]
if !ok {
log.Printf("etcdserver: send message to unknown receiver %s", to)
if ok {
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
}
p.Send(m)
continue
}
if m.Type == raftpb.MsgApp {
t.serverStats.SendAppendReq(m.Size())
g, ok := t.remotes[to]
if ok {
g.Send(m)
continue
}
p.Send(m)
log.Printf("etcdserver: send message to unknown receiver %s", to)
}
}
func (t *transport) Stop() {
for _, r := range t.remotes {
r.Stop()
}
for _, p := range t.peers {
p.Stop()
}
@ -113,6 +130,21 @@ func (t *transport) Stop() {
}
}
func (t *transport) AddRemote(id types.ID, us []string) {
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.remotes[id]; ok {
return
}
peerURL := us[0]
u, err := url.Parse(peerURL)
if err != nil {
log.Panicf("unexpect peer url %s", peerURL)
}
u.Path = path.Join(u.Path, RaftPrefix)
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
}
func (t *transport) AddPeer(id types.ID, urls []string) {
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -82,7 +82,10 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
break
}
}
return snap, err
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {

View File

@ -76,7 +76,7 @@ func TestBadCRC(t *testing.T) {
// fake a crc mismatch
crcTable = crc32.MakeTable(crc32.Koopman)
_, err = ss.Load()
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
if err == nil || err != ErrCRCMismatch {
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
}
@ -182,7 +182,7 @@ func TestNoSnapshot(t *testing.T) {
defer os.RemoveAll(dir)
ss := New(dir)
_, err = ss.Load()
if err == nil || err != ErrNoSnapshot {
if err != ErrNoSnapshot {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}
@ -195,14 +195,35 @@ func TestEmptySnapshot(t *testing.T) {
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("shit"), 0x700)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
if err != nil {
t.Fatal(err)
}
_, err = Read(path.Join(dir, "1.snap"))
if err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
}
}
// TestAllSnapshotBroken ensures snapshotter returens
// ErrNoSnapshot if all the snapshots are broken.
func TestAllSnapshotBroken(t *testing.T) {
dir := path.Join(os.TempDir(), "snapshot")
err := os.Mkdir(dir, 0700)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
if err != nil {
t.Fatal(err)
}
ss := New(dir)
_, err = ss.Load()
if err == nil || err != ErrEmptySnapshot {
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
if err != ErrNoSnapshot {
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
}
}

View File

@ -84,7 +84,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
if ok { // add the new watcher to the back of the list
elem = l.PushBack(w)
} else { // create a new list and add the new watcher
l = list.New()
elem = l.PushBack(w)
@ -146,6 +145,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
}

2
test
View File

@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
source ./build
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes migrate pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes migrate pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store version wal"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/ integration"
# user has not provided PKG override

View File

@ -15,7 +15,8 @@ etcd will detect 0.4.x data dir and update the data automatically (while leaving
The tool can be run via:
```sh
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA>
./go build
./etcd-migrate --data-dir=<PATH TO YOUR DATA>
```
It should autodetect everything and convert the data-dir to be 2.0 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 2.0 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
@ -44,4 +45,4 @@ If the conversion has completed, the entire cluster is running on something 2.0-
rm -ri snapshot conf log
```
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.

View File

@ -14,7 +14,68 @@
package version
var (
Version = "2.0.6"
InternalVersion = "2"
import (
"os"
"path"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/types"
)
var (
Version = "2.0.11"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
)
// WalVersion is an enum for versions of etcd logs.
type DataDirVersion string
const (
DataDirUnknown DataDirVersion = "Unknown WAL"
DataDir0_4 DataDirVersion = "0.4.x"
DataDir2_0 DataDirVersion = "2.0.0"
DataDir2_0Proxy DataDirVersion = "2.0 proxy"
DataDir2_0_1 DataDirVersion = "2.0.1"
)
func DetectDataDir(dirpath string) (DataDirVersion, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
// Error reading the directory
return DataDirUnknown, err
}
nameSet := types.NewUnsafeSet(names...)
if nameSet.Contains("member") {
ver, err := DetectDataDir(path.Join(dirpath, "member"))
if ver == DataDir2_0 {
return DataDir2_0_1, nil
} else if ver == DataDir0_4 {
// How in the blazes did it get there?
return DataDirUnknown, nil
}
return ver, err
}
if nameSet.ContainsAll([]string{"snap", "wal"}) {
// .../wal cannot be empty to exist.
walnames, err := fileutil.ReadDir(path.Join(dirpath, "wal"))
if err == nil && len(walnames) > 0 {
return DataDir2_0, nil
}
}
if nameSet.ContainsAll([]string{"proxy"}) {
return DataDir2_0Proxy, nil
}
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
return DataDir0_4, nil
}
if nameSet.ContainsAll([]string{"standby_info"}) {
return DataDir0_4, nil
}
return DataDirUnknown, nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
package version
import (
"io/ioutil"
@ -22,21 +22,20 @@ import (
"testing"
)
func TestDetectVersion(t *testing.T) {
func TestDetectDataDir(t *testing.T) {
tests := []struct {
names []string
wver WalVersion
wver DataDirVersion
}{
{[]string{}, WALNotExist},
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
{[]string{"weird"}, WALUnknown},
{[]string{"snap/", "wal/"}, WALUnknown},
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, DataDir2_0_1},
{[]string{"snap/", "wal/", "wal/1"}, DataDir2_0},
{[]string{"snapshot/", "conf", "log"}, DataDir0_4},
{[]string{"weird"}, DataDirUnknown},
{[]string{"snap/", "wal/"}, DataDirUnknown},
}
for i, tt := range tests {
p := mustMakeDir(t, tt.names...)
ver, err := DetectVersion(p)
ver, err := DetectDataDir(p)
if ver != tt.wver {
t.Errorf("#%d: version = %s, want %s", i, ver, tt.wver)
}
@ -45,15 +44,6 @@ func TestDetectVersion(t *testing.T) {
}
os.RemoveAll(p)
}
// detect on non-exist directory
v, err := DetectVersion(path.Join(os.TempDir(), "waltest", "not-exist"))
if v != WALNotExist {
t.Errorf("#non-exist: version = %s, want %s", v, WALNotExist)
}
if err != nil {
t.Errorf("#non-exist: err = %s, want %s", v, WALNotExist)
}
}
// mustMakeDir builds the directory that contains files with the given

View File

@ -17,68 +17,10 @@ package wal
import (
"fmt"
"log"
"os"
"path"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/types"
)
// WalVersion is an enum for versions of etcd logs.
type WalVersion string
const (
WALUnknown WalVersion = "Unknown WAL"
WALNotExist WalVersion = "No WAL"
WALv0_4 WalVersion = "0.4.x"
WALv2_0 WalVersion = "2.0.0"
WALv2_0Proxy WalVersion = "2.0 proxy"
WALv2_0_1 WalVersion = "2.0.1"
)
func DetectVersion(dirpath string) (WalVersion, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
// Error reading the directory
return WALNotExist, err
}
if len(names) == 0 {
// Empty WAL directory
return WALNotExist, nil
}
nameSet := types.NewUnsafeSet(names...)
if nameSet.Contains("member") {
ver, err := DetectVersion(path.Join(dirpath, "member"))
if ver == WALv2_0 {
return WALv2_0_1, nil
} else if ver == WALv0_4 {
// How in the blazes did it get there?
return WALUnknown, nil
}
return ver, err
}
if nameSet.ContainsAll([]string{"snap", "wal"}) {
// .../wal cannot be empty to exist.
if Exist(path.Join(dirpath, "wal")) {
return WALv2_0, nil
}
}
if nameSet.ContainsAll([]string{"proxy"}) {
return WALv2_0Proxy, nil
}
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
return WALv0_4, nil
}
if nameSet.ContainsAll([]string{"standby_info"}) {
return WALv0_4, nil
}
return WALUnknown, nil
}
func Exist(dirpath string) bool {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
@ -125,7 +67,7 @@ func checkWalNames(names []string) []string {
wnames := make([]string, 0)
for _, name := range names {
if _, _, err := parseWalName(name); err != nil {
log.Printf("wal: parse %s error: %v", name, err)
log.Printf("wal: ignored file %v in wal", name)
continue
}
wnames = append(wnames, name)

View File

@ -273,30 +273,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}
// Cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomtically rename temp wal file to a wal file.
func (w *WAL) Cut() error {
// create a new wal file with name sequence + 1
// close old wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
ftpath := fpath + ".tmp"
// create a temp wal file with name sequence + 1, or tuncate the existing one
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
if err = w.sync(); err != nil {
return err
}
w.f.Close()
// update writer and save the previous crc
w.f = f
w.seq++
w.f = ft
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
if err := w.saveCrc(prevCrc); err != nil {
@ -308,7 +306,45 @@ func (w *WAL) Cut() error {
if err := w.saveState(&w.state); err != nil {
return err
}
return w.sync()
// close temp wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
// atomically move temp wal file to wal file
if err := os.Rename(ftpath, fpath); err != nil {
return err
}
// open the wal file and update writer again
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return err
}
w.f = f
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
// lock the new wal file
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
// increase the wal seq
w.seq++
log.Printf("wal: segmented wal file %v is created", fpath)
return nil
}
func (w *WAL) sync() error {
@ -326,6 +362,7 @@ func (w *WAL) sync() error {
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
func (w *WAL) ReleaseLockTo(index uint64) error {
var smaller int
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
@ -334,10 +371,17 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
}
if lockIndex >= index {
smaller = i - 1
found = true
break
}
}
// if no lock index is greater than the release index, we can
// release lock upto the last one(excluding).
if !found && len(w.locks) != 0 {
smaller = len(w.locks) - 1
}
if smaller <= 0 {
return nil
}

View File

@ -504,4 +504,21 @@ func TestReleaseLockTo(t *testing.T) {
t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
}
}
// release the lock to 15
unlockIndex = uint64(15)
w.ReleaseLockTo(unlockIndex)
// expected remaining is 10
if len(w.locks) != 1 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
}
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
if err != nil {
t.Fatal(err)
}
if lockIndex != uint64(10) {
t.Errorf("lockindex = %d, want %d", lockIndex, 10)
}
}