Compare commits

...

44 Commits

Author SHA1 Message Date
221abdcb3b version: bump to v2.0.0-rc.1 2014-12-18 10:27:29 -08:00
fa35363f74 Documentation: update to 2.0
In anticipation for a 2.0.0-rc.0 release update and move the
documentation.
2014-12-18 10:18:26 -08:00
35a772753c Merge pull request #1958 from xiang90/compatibility
doc: add backword_compatibility.md
2014-12-17 15:58:28 -08:00
aea87bc88d Merge pull request #1951 from jlsalvador/patch-1
etcdctl: add environment support to certs args
2014-12-17 15:53:59 -08:00
ce6f606766 doc: add backword_compatibility.md 2014-12-17 15:47:49 -08:00
a000e97eea Merge pull request #1956 from coreos/member-migration-example
doc: update node migration guide
2014-12-17 12:10:46 -08:00
2d76e5e273 doc: update node migration guide 2014-12-17 12:10:10 -08:00
f0b9ad3863 Merge pull request #1955 from coreos/doc-cleanup
docs: fix port in peer URLs
2014-12-17 11:01:27 -08:00
0a14927823 docs: fix port in peer URLs 2014-12-17 10:51:52 -08:00
910198d117 etcdctl: add environment support to certs args 2014-12-16 16:18:36 +01:00
722247a752 Merge pull request #1948 from xiang90/stats
etcdserver: fix leader stats
2014-12-15 17:10:48 -08:00
c27c288bef etcdserver: update stats when become leader 2014-12-15 17:02:48 -08:00
04522baeee etcdserver: fix leader stats 2014-12-15 16:50:03 -08:00
43bb6cf038 Merge pull request #1946 from barakmich/less_logging
Remove verbose logging and extraneous debug. Fixes #1904
2014-12-15 11:49:48 -08:00
8ece28d4f7 Remove verbose logging and extraneous debug. Fixes #1904 2014-12-15 14:47:02 -05:00
5369fb1c4f Merge pull request #1945 from xiang90/raft_test
raft: use newRaft
2014-12-15 11:31:11 -08:00
044e35b814 raft: use newRaft 2014-12-15 11:25:35 -08:00
e9b06416de Merge pull request #1942 from xiang90/doc
doc: specify listening addrs in the clustering example
2014-12-15 10:41:34 -08:00
9dc5b5a7e8 Merge pull request #1943 from xiang90/fix_streamSrv
sender: set strmSrv to nil after stoping it
2014-12-14 20:13:23 -08:00
e3dbfefbe0 sender: set strmSrv to nil after stoping it 2014-12-14 20:00:32 -08:00
0ea8c0929e Merge pull request #1927 from xiang90/flock
*: lock the in using files; do not purge locked the wal files
2014-12-14 19:39:58 -08:00
502396edd5 wal: fix wal doc 2014-12-14 19:36:37 -08:00
6b73a72d42 test: add fileutil to test 2014-12-14 19:34:54 -08:00
53bf7e4b5e wal: rename openAtIndex -> open; OpenAtIndexUntilUsing -> openNotInUse 2014-12-14 19:33:06 -08:00
f538cba272 *: do not backup files still in use 2014-12-14 19:27:22 -08:00
ea94d19147 *: lock the in using files; do not purge locked the wal files 2014-12-14 19:27:22 -08:00
b90693ccae doc: specify listening addrs in the clustering example 2014-12-14 19:21:54 -08:00
dcf34c0ab4 Merge pull request #1938 from yichengq/262
etcdserver: protect the sender map in SendHub
2014-12-15 10:41:52 +08:00
ceb077424d etcdserver: protect the sender map in SendHub 2014-12-15 10:37:41 +08:00
d07434f99e Merge pull request #1939 from xiang90/sender_logging
rafthttp: better logging
2014-12-14 18:23:36 -08:00
cb6983cbb1 Merge pull request #1940 from xiang90/raft_log
rafT: log term as %d
2014-12-14 10:06:59 -08:00
c586d5012c raft: log term as %d 2014-12-14 10:06:45 -08:00
d86603840d rafthttp: better logging 2014-12-14 09:50:59 -08:00
e40a53b534 Merge pull request #1926 from jainvipin/master
minor fix for #1786
2014-12-13 16:26:59 -08:00
3f64c677e1 modify directory deletion sequence 2014-12-13 16:22:36 -08:00
b8ab2b0b5c Merge pull request #1936 from xiang90/fix_test
pkg/transport: change write size from 1MB -> 5MB
2014-12-13 11:38:51 -08:00
3cc4cdd363 pkg/transport: change write size from 1MB -> 5MB
As we move to container-based infrastructure testing env
on travis, the tcp write buffer is more than 1MB. Change
the test according to the change on the testing env.
2014-12-13 11:32:29 -08:00
c620238257 Merge pull request #1934 from xiang90/fix_test
discovery: fix watch index
2014-12-13 11:18:32 -08:00
f265afa8ac discovery: fix watch index 2014-12-13 11:15:24 -08:00
bee3103931 Merge pull request #1931 from barakmich/travis
Disable unused sudo-ability from Travis in hopes of faster startup
2014-12-12 19:47:42 -05:00
fa195dae39 Merge pull request #1929 from lisael/master
added aioetcd (python 3.4+) to client libs list
2014-12-12 16:43:29 -08:00
fe4abc40ce Disable unused sudo-ability from Travis in hopes of faster startup
times.
2014-12-12 19:39:09 -05:00
1f0d43250f documentation: fix min python version for aioetcd 2014-12-12 23:46:34 +01:00
97025bf5f1 documentation: added aioetcd to client libs 2014-12-12 23:40:56 +01:00
40 changed files with 717 additions and 175 deletions

View File

@ -1,4 +1,5 @@
language: go
sudo: false
go:
- 1.3

View File

@ -1,10 +1,10 @@
## etcd 0.4.x -> 0.5.0 Data Migration Tool
## etcd 0.4.x -> 2.0.0 Data Migration Tool
### Upgrading from 0.4.x
Between 0.4.x and 0.5, the on-disk data formats have changed. In order to allow users to convert to 0.5, a migration tool is provided.
Between 0.4.x and 2.0, the on-disk data formats have changed. In order to allow users to convert to 2.0, a migration tool is provided.
In the early 0.5.0-alpha series, we're providing this tool early to encourage adoption. However, before 0.5.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency).
In the early 2.0.0-alpha series, we're providing this tool early to encourage adoption. However, before 2.0.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency).
### Data Migration Tips
@ -18,7 +18,7 @@ The tool can be run via:
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA>
```
It should autodetect everything and convert the data-dir to be 0.5 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 0.5 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
It should autodetect everything and convert the data-dir to be 2.0 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 2.0 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
If, however, it complains about autodetecting the name (which can happen, depending on how the cluster was configured), you need to supply the name of this particular node. This is equivalent to the `--name` flag (or `ETCD_NAME` variable) that etcd was run with, which can also be found by accessing the self api, eg:
@ -38,10 +38,10 @@ And the tool should migrate successfully. If it still has an error at this time,
### Recovering Disk Space
If the conversion has completed, the entire cluster is running on something 0.5-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
If the conversion has completed, the entire cluster is running on something 2.0-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
```sh
rm -ri snapshot conf log
```
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 0.5 data.
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.

View File

@ -15,7 +15,7 @@ Using an out-of-date data directory can lead to inconsistency as the member had
For maximum safety, if an etcd member suffers any sort of data corruption or loss, it must be removed from the cluster.
Once removed the member can be re-added with an empty data directory.
[members-api]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#members-api
[members-api]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#members-api
#### Contents
@ -43,7 +43,70 @@ The data directory contains all the data to recover a member to its point-in-tim
* Update the peer URLs for that member to reflect the new machine according to the [member api] [change peer url]
* Start etcd on the new machine, using the same configuration and the copy of the data directory
[change peer url]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#change-the-peer-urls-of-a-member
This example will walk you through the process of migrating the infra1 member to a new machine:
|Name|Peer URL|
|------|--------------|
|infra0|10.0.1.10:2380|
|infra1|10.0.1.11:2380|
|infra2|10.0.1.12:2380|
```
$ export ETCDCTL_PEERS=http://10.0.1.10:2379,http://10.0.1.11:2379,http://10.0.1.12:2379
```
```
$ etcdctl member list
84194f7c5edd8b37: name=infra0 peerURLs=http://10.0.1.10:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.10:2379
b4db3bf5e495e255: name=infra1 peerURLs=http://10.0.1.11:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.11:2379
bc1083c870280d44: name=infra2 peerURLs=http://10.0.1.12:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.12:2379
```
#### Stop the member etcd process
```
$ ssh core@10.0.1.11
```
```
$ sudo systemctl stop etcd
```
#### Copy the data directory of the now-idle member to the new machine
```
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
```
```
$ scp node1.etcd.tar.gz core@10.0.1.13:~/
```
#### Update the peer URLs for that member to reflect the new machine
```
$ curl http://10.0.1.10:2379/v2/members/b4db3bf5e495e255 -XPUT \
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.1.13:2380"]}'
```
#### Start etcd on the new machine, using the same configuration and the copy of the data directory
```
$ ssh core@10.0.1.13
```
```
$ tar -xzvf node1.etcd.tar.gz -C /var/lib/etcd
```
```
etcd -name node1 \
-listen-peer-urls http://10.0.1.13:2380 \
-listen-client-urls http://10.0.1.13:2379,http://127.0.0.1:2379 \
-advertise-client-urls http://10.0.1.13:2379,http://127.0.0.1:2379
```
[change peer url]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#change-the-peer-urls-of-a-member
### Disaster Recovery

View File

@ -0,0 +1,53 @@
### Backward Compatibility
The main goal of etcd 2.0 release is to improve cluster safety around bootstrapping and dynamic reconfiguration. To do this, we deprecated the old error-prone APIs and provide a new set of APIs.
The other main focus of this release was a more reliable Raft implementation, but as this change is internal it should not have any notable effects to users.
#### Command Line Flags Changes
The major flag changes are to mostly related to bootstrapping. The `initial-*` flags provide an improved way to specify the required criteria to start the cluster. The advertised URLs now support a list of values instead of a single value, which allows etcd users to gracefully migrate to the new set of IANA-assigned ports (2379/client and 2380/peers) while maintaining backward compatibility with the old ports.
- `-addr` is replaced by `-advertise-client-urls`.
- `-bind-addr` is replaced by `-listen-client-urls`.
- `-peer-add` is replaced by `-initial-advertise-peer-urls`.
- `-peer-bind-addr` is replaced by `-listen-peer-urls`.
- `-peers` is replaced by `-initial-cluster`.
- `-peers-file` is replaced by `-initial-cluster`.
The documentation of new command line flags can be found at
https://github.com/coreos/etcd/blob/master/Documentation/2.0/configuration.md.
#### Data Dir
- Default data dir location has changed from {$hostname}.etcd to {name}.etcd.
- The disk format within the data dir has changed. etcd 2.0 should be able to auto upgrade the old data format. Instructions on doing so manually are in the [migration tool doc][migrationtooldoc].
[migrationtooldoc]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/0_4_migration_tool.md
#### Standby
etcd 0.4s standby mode has been deprecated by 2.0s [proxy mode][proxymode].
Standby mode was intended for large clusters that had a subset of the members acting in the consensus process. Overall this process was too magical and allowed for operators to back themselves into a corner.
Proxy mode in 2.0 will provide similar functionality, and with improved control over which machines act as proxies due to the operator specifically configuring them. Proxies also support read only or read/write modes for increased security and durability.
[proxymode]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/proxy.md
#### Discovery Service
A size key needs to be provided inside a [discovery token][discoverytoken].
[discoverytoken]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#custom-etcd-discovery-service
#### HTTP Admin API
`v2/admin` on peer url and `v2/keys/_etcd` are unified under the new [v2/member API][memberapi] to better explain which machines are part of an etcd cluster, and to simplify the keyspace for all your use cases.
[memberapi]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md
#### HTTP Key Value API
- The follower can now transparently proxy write equests to the leader. Clients will no longer see 307 redirections to the leader from etcd.
- Expiration time is in UTC instead of local time.

View File

@ -30,18 +30,21 @@ On each machine you would start etcd with these flags:
```
$ etcd -name infra0 -initial-advertise-peer-urls https://10.0.1.10:2380 \
-listen-peer-urls https://10.0.1.10:2380 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
-initial-cluster-state new
```
```
$ etcd -name infra1 -initial-advertise-peer-urls https://10.0.1.11:2380 \
-listen-peer-urls https://10.0.1.11:2380 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
-initial-cluster-state new
```
```
$ etcd -name infra2 -initial-advertise-peer-urls https://10.0.1.12:2380 \
-listen-peer-urls https://10.0.1.12:2380 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
-initial-cluster-state new
@ -55,6 +58,7 @@ In the following example, we have not included our new host in the list of enume
```
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
-listen-peer-urls https://10.0.1.11:2380 \
-initial-cluster infra0=http://10.0.1.10:2380 \
-initial-cluster-state new
etcd: infra1 not listed in the initial cluster config
@ -65,6 +69,7 @@ In this example, we are attempting to map a node (infra0) on a different address
```
$ etcd -name infra0 -initial-advertise-peer-urls http://127.0.0.1:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
-initial-cluster-state=new
etcd: error setting up initial cluster: infra0 has different advertised URLs in the cluster and advertised peer URLs list
@ -75,6 +80,7 @@ If you configure a peer with a different set of configuration and attempt to joi
```
$ etcd -name infra3 -initial-advertise-peer-urls http://10.0.1.13:2380 \
-listen-peer-urls http://10.0.1.13:2380 \
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra3=http://10.0.1.13:2380 \
-initial-cluster-state=new
etcd: conflicting cluster ID to the target cluster (c6ab534d07e8fcc4 != bc25ea2a74fb18b0). Exiting.
@ -91,7 +97,7 @@ A discovery URL identifies a unique etcd cluster. Instead of reusing a discovery
Moreover, discovery URLs should ONLY be used for the initial bootstrapping of a cluster. To change cluster membership after the cluster is already running, see the [runtime reconfiguration][runtime] guide.
[runtime]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/runtime-configuration.md
[runtime]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/runtime-configuration.md
### Custom etcd discovery service
@ -111,14 +117,17 @@ Now we start etcd with those relevant flags for each member:
```
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
```
```
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
-listen-peer-urls http://10.0.1.11:2380 \
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
```
```
$ etcd -name infra2 -initial-advertise-peer-urls http://10.0.1.12:2380 \
-listen-peer-urls http://10.0.1.12:2380 \
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
```
@ -152,14 +161,17 @@ Now we start etcd with those relevant flags for each member:
```
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
```
```
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
-listen-peer-urls http://10.0.1.11:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
```
```
$ etcd -name infra2 -initial-advertise-peer-urls http://10.0.1.12:2380 \
-listen-peer-urls http://10.0.1.12:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
```
@ -174,6 +186,7 @@ You can use the environment variable `ETCD_DISCOVERY_PROXY` to cause etcd to use
```
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
etcd: error: the cluster doesnt have a size configuration value in https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de/_config
exit 1
@ -185,6 +198,7 @@ This error will occur if the discovery cluster already has the configured number
```
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de \
-discovery-fallback exit
etcd: discovery: cluster is full
@ -198,13 +212,14 @@ ignored on this machine.
```
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
-listen-peer-urls http://10.0.1.10:2380 \
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at /var/lib/etcd
```
# 0.4 to 0.5+ Migration Guide
# 0.4 to 2.0+ Migration Guide
In etcd 0.5 we introduced the ability to listen on more than one address and to advertise multiple addresses. This makes using etcd easier when you have complex networking, such as private and public networks on various cloud providers.
In etcd 2.0 we introduced the ability to listen on more than one address and to advertise multiple addresses. This makes using etcd easier when you have complex networking, such as private and public networks on various cloud providers.
To make understanding this feature easier, we changed the naming of some flags, but we support the old flags to make the migration from the old to new version easier.

View File

@ -131,9 +131,9 @@ Be CAUTIOUS to use unsafe flags because it will break the guarantee given by con
+ Print the version and exit.
+ default: false
[build-cluster]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#static
[reconfig]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/runtime-configuration.md
[discovery]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#discovery
[proxy]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/proxy.md
[build-cluster]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#static
[reconfig]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/runtime-configuration.md
[discovery]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#discovery
[proxy]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/proxy.md
[security]: https://github.com/coreos/etcd/blob/master/Documentation/security.md
[restore]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/admin_guide.md#restoring-a-backup
[restore]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/admin_guide.md#restoring-a-backup

View File

@ -28,4 +28,4 @@ Client is a caller of the cluster's HTTP API.
### Machine (deprecated)
The alternative of Member in etcd before 0.5
The alternative of Member in etcd before 2.0

View File

@ -59,21 +59,21 @@ If the POST body is malformed an HTTP 400 will be returned. If the member exists
```
POST /v2/members HTTP/1.1
{"peerURLs": ["http://10.0.0.10:2379"]}
{"peerURLs": ["http://10.0.0.10:2380"]}
```
### Example
```sh
curl http://10.0.0.10:2379/v2/members -XPOST \
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2379"]}'
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2380"]}'
```
```json
{
"id": "3777296169",
"peerURLs": [
"http://10.0.0.10:2379"
"http://10.0.0.10:2380"
]
}
```
@ -108,12 +108,12 @@ If the POST body is malformed an HTTP 400 will be returned. If the member does n
```
PUT /v2/members/<id> HTTP/1.1
{"peerURLs": ["http://10.0.0.10:2379"]}
{"peerURLs": ["http://10.0.0.10:2380"]}
```
#### Example
```sh
curl http://10.0.0.10:2379/v2/members/272e204152 -XPUT \
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:12379"]}'
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2380"]}'
```

View File

@ -29,4 +29,4 @@ etcd -proxy on -client-listen-urls 127.0.0.1:8080 -discovery https://discovery.
#### Fallback to proxy mode with discovery service
If you bootstrap a etcd cluster using [discovery service][discovery-service] with more than the expected number of etcd members, the extra etcd processes will fall back to being `readwrite` proxies by default. They will forward the requests to the cluster as described above. For example, if you create a discovery url with `size=5`, and start ten etcd processes using that same discovery URL, the result will be a cluster with five etcd members and five proxies. Note that this behaviour can be disabled with the `proxy-fallback` flag.
[discovery-service]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#discovery
[discovery-service]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#discovery

View File

@ -14,7 +14,7 @@ If etcd falls below a simple majority of members it can no longer accept writes:
If you want to migrate a running member to another machine, please refer [member migration section][member migration].
[member migration]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/admin_guide.md#member-migration
[member migration]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/admin_guide.md#member-migration
### Increase Cluster Size
@ -53,7 +53,7 @@ To increase from 3 to 5 members you will make two add operations
To decrease from 5 to 3 you will make two remove operations
All of these examples will use the `etcdctl` command line tool that ships with etcd.
If you want to use the member API directly you can find the documentation [here](https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md).
If you want to use the member API directly you can find the documentation [here](https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md).
### Remove a Member
@ -86,7 +86,7 @@ Removal of the leader is safe, but the cluster will be out of progress for a per
Adding a member is a two step process:
* Add the new member to the cluster via the [members API](https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#post-v2members) or the `etcdctl member add` command.
* Add the new member to the cluster via the [members API](https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#post-v2members) or the `etcdctl member add` command.
* Start the member with the correct configuration.
Using `etcdctl` let's add the new member to the cluster:

View File

@ -706,35 +706,51 @@ curl -L http://127.0.0.1:4001/v2/keys/?recursive=true
### Deleting a Directory
Now let's try to delete the directory `/foo_dir`.
Now let's try to delete the directory `/dir`
You can remove an empty directory using the `DELETE` verb and the `dir=true` parameter.
You can remove an empty directory using the `DELETE` verb and the `dir=true` parameter. Following will succeed because `/dir` was empty
```sh
curl -L 'http://127.0.0.1:4001/v2/keys/foo_dir?dir=true' -XDELETE
curl -L 'http://127.0.0.1:4001/v2/keys/dir?dir=true' -XDELETE
```
```json
{
"action": "delete",
"node": {
"createdIndex": 30,
"dir": true,
"key": "/foo_dir",
"key": "/dir",
"modifiedIndex": 31
},
"prevNode": {
"createdIndex": 30,
"key": "/foo_dir",
"key": "/dir",
"dir": true,
"modifiedIndex": 30
}
}
```
However, deleting `/foo_dir` will result into an error because `/foo_dir` is not empty.
```sh
curl -L 'http://127.0.0.1:4001/v2/keys/foo_dir?dir=true' -XDELETE
```
```json
{
"errorCode":108,
"message":"Directory not empty",
"cause":"/foo_dir",
"index":2
}
```
To delete a directory that holds keys, you must add `recursive=true`.
```sh
curl -L http://127.0.0.1:4001/v2/keys/dir?recursive=true -XDELETE
curl -L http://127.0.0.1:4001/v2/keys/foo_dir?recursive=true -XDELETE
```
```json

View File

@ -27,6 +27,7 @@
- [jplana/python-etcd](https://github.com/jplana/python-etcd) - Supports v2
- [russellhaering/txetcd](https://github.com/russellhaering/txetcd) - a Twisted Python library
- [cholcombe973/autodock](https://github.com/cholcombe973/autodock) - A docker deployment automation tool
- [lisael/aioetcd](https://github.com/lisael/aioetcd) - (Python 3.4+) Asyncio coroutines client (Supports v2)
**Node libraries**

View File

@ -3,13 +3,14 @@
[![Build Status](https://travis-ci.org/coreos/etcd.png?branch=master)](https://travis-ci.org/coreos/etcd)
[![Docker Repository on Quay.io](https://quay.io/repository/coreos/etcd-git/status "Docker Repository on Quay.io")](https://quay.io/repository/coreos/etcd-git)
### WARNING ###
### Release Candidate Warning ###
The current `master` branch of etcd is under heavy development in anticipation of the forthcoming 0.5.0 release.
The current `master` branch of etcd is under development in anticipation of the forthcoming 2.0.0 release.
It is strongly recommended that users work with the latest 0.4.x release (0.4.6), which can be found on the [releases](https://github.com/coreos/etcd/releases) page.
Unless otherwise noted, the etcd documentation refers to configuring and running 0.4.x releases.
Documentation related to the 2.0.0 release candidates can be found in the `Documentation/2.0` directory.
## README version 0.4.6

View File

@ -276,7 +276,8 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (clien
if len(nodes) > size {
nodes = nodes[:size]
}
w := d.c.RecursiveWatch(d.cluster, index)
// watch from the next index
w := d.c.RecursiveWatch(d.cluster, index+1)
all := make(client.Nodes, len(nodes))
copy(all, nodes)
for _, n := range all {

View File

@ -67,7 +67,7 @@ func handleBackup(c *cli.Context) {
}
}
w, err := wal.OpenAtIndex(srcWAL, index)
w, err := wal.OpenNotInUse(srcWAL, index)
if err != nil {
log.Fatal(err)
}

View File

@ -91,10 +91,26 @@ func getEndpoints(c *cli.Context) ([]string, error) {
}
func getTransport(c *cli.Context) (*http.Transport, error) {
cafile := c.GlobalString("ca-file")
certfile := c.GlobalString("cert-file")
keyfile := c.GlobalString("key-file")
// Use an environment variable if nothing was supplied on the
// command line
if cafile == "" {
cafile = os.Getenv("ETCDCTL_CA_FILE")
}
if certfile == "" {
certfile = os.Getenv("ETCDCTL_CERT_FILE")
}
if keyfile == "" {
keyfile = os.Getenv("ETCDCTL_KEY_FILE")
}
tls := transport.TLSInfo{
CAFile: c.GlobalString("ca-file"),
CertFile: c.GlobalString("cert-file"),
KeyFile: c.GlobalString("key-file"),
CAFile: cafile,
CertFile: certfile,
KeyFile: keyfile,
}
return transport.NewTransport(tls)

View File

@ -260,8 +260,13 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
stats := h.stats.LeaderStats()
if stats == nil {
writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(h.stats.LeaderStats())
w.Write(stats)
}
func serveVersion(w http.ResponseWriter, r *http.Request) {

View File

@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"path"
"sync"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
@ -48,6 +49,7 @@ type sendHub struct {
p rafthttp.Processor
ss *stats.ServerStats
ls *stats.LeaderStats
mu sync.RWMutex // protect the sender map
senders map[types.ID]rafthttp.Sender
shouldstop chan struct{}
}
@ -67,7 +69,11 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *s
}
}
func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
h.mu.RLock()
defer h.mu.RUnlock()
return h.senders[id]
}
func (h *sendHub) Send(msgs []raftpb.Message) {
for _, m := range msgs {
@ -102,6 +108,8 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
}
func (h *sendHub) Add(m *Member) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.senders[m.ID]; ok {
return
}
@ -113,16 +121,20 @@ func (h *sendHub) Add(m *Member) {
}
u.Path = path.Join(u.Path, raftPrefix)
fs := h.ls.Follower(m.ID.String())
s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop)
s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop)
h.senders[m.ID] = s
}
func (h *sendHub) Remove(id types.ID) {
h.mu.Lock()
defer h.mu.Unlock()
h.senders[id].Stop()
delete(h.senders, id)
}
func (h *sendHub) Update(m *Member) {
h.mu.Lock()
defer h.mu.Unlock()
// TODO: return error or just panic?
if _, ok := h.senders[m.ID]; !ok {
return

View File

@ -90,21 +90,6 @@ type Response struct {
err error
}
type Storage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
// remove it in this interface.
// Cut cuts out a new wal file for saving new state and entries.
Cut() error
// Close closes the Storage and performs finalization.
Close() error
}
type Server interface {
// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
@ -295,15 +280,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
storage: struct {
*wal.WAL
*snap.Snapshotter
}{w, ss},
stats: sstats,
lstats: lstats,
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
storage: NewStorage(w, ss),
stats: sstats,
lstats: lstats,
Ticker: time.Tick(100 * time.Millisecond),
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
}
srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) {
@ -403,6 +385,11 @@ func (s *EtcdServer) run() {
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker
// TODO: remove the nil checking
// current test utility does not provide the stats
if s.stats != nil {
s.stats.BecomeLeader()
}
} else {
syncC = nil
}
@ -544,7 +531,10 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
func (s *EtcdServer) LeaderStats() []byte {
// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
lead := atomic.LoadUint64(&s.raftLead)
if lead != uint64(s.id) {
return nil
}
return s.lstats.JSON()
}
@ -1005,7 +995,7 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (ty
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var err error
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
if w, err = wal.Open(waldir, index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
var wmetadata []byte

View File

@ -141,3 +141,11 @@ func (ss *ServerStats) SendAppendReq(reqSize int) {
ss.SendAppendRequestCnt++
}
func (ss *ServerStats) BecomeLeader() {
if ss.State != raft.StateLeader {
ss.State = raft.StateLeader
ss.LeaderInfo.Name = ss.ID
ss.LeaderInfo.StartTime = time.Now()
}
}

45
etcdserver/storage.go Normal file
View File

@ -0,0 +1,45 @@
package etcdserver
import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
)
type Storage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
// remove it in this interface.
// Cut cuts out a new wal file for saving new state and entries.
Cut() error
// Close closes the Storage and performs finalization.
Close() error
}
type storage struct {
*wal.WAL
*snap.Snapshotter
}
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
}
// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
err := st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
if err != nil {
return err
}
return nil
}

View File

@ -145,7 +145,6 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
//snapNodes[p.Name] = uint64(m.ID)
//}
for _, p := range cfg.Peers {
log.Printf(p.Name)
delete(snapNodes, p.Name)
}
if len(snapNodes) == 1 {

View File

@ -490,8 +490,6 @@ func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry,
Data: data,
}
log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type)
return &ent5, nil
}

60
pkg/fileutil/lock.go Normal file
View File

@ -0,0 +1,60 @@
package fileutil
import (
"errors"
"os"
"syscall"
)
var (
ErrLocked = errors.New("file already locked")
)
type Lock interface {
Name() string
TryLock() error
Lock() error
Unlock() error
Destroy() error
}
type lock struct {
fd int
file *os.File
}
func (l *lock) Name() string {
return l.file.Name()
}
// TryLock acquires exclusivity on the lock without blocking
func (l *lock) TryLock() error {
err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil && err == syscall.EWOULDBLOCK {
return ErrLocked
}
return err
}
// Lock acquires exclusivity on the lock without blocking
func (l *lock) Lock() error {
return syscall.Flock(l.fd, syscall.LOCK_EX)
}
// Unlock unlocks the lock
func (l *lock) Unlock() error {
return syscall.Flock(l.fd, syscall.LOCK_UN)
}
func (l *lock) Destroy() error {
return l.file.Close()
}
func NewLock(file string) (Lock, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
l := &lock{int(f.Fd()), f}
return l, nil
}

82
pkg/fileutil/lock_test.go Normal file
View File

@ -0,0 +1,82 @@
package fileutil
import (
"io/ioutil"
"os"
"testing"
"time"
)
func TestLockAndUnlock(t *testing.T) {
f, err := ioutil.TempFile("", "lock")
if err != nil {
t.Fatal(err)
}
f.Close()
defer func() {
err := os.Remove(f.Name())
if err != nil {
t.Fatal(err)
}
}()
// lock the file
l, err := NewLock(f.Name())
if err != nil {
t.Fatal(err)
}
defer l.Destroy()
err = l.Lock()
if err != nil {
t.Fatal(err)
}
// try lock a locked file
dupl, err := NewLock(f.Name())
if err != nil {
t.Fatal(err)
}
err = dupl.TryLock()
if err != ErrLocked {
t.Errorf("err = %v, want %v", err, ErrLocked)
}
// unlock the file
err = l.Unlock()
if err != nil {
t.Fatal(err)
}
// try lock the unlocked file
err = dupl.TryLock()
if err != nil {
t.Errorf("err = %v, want %v", err, nil)
}
defer dupl.Destroy()
// blocking on locked file
locked := make(chan struct{}, 1)
go func() {
l.Lock()
locked <- struct{}{}
}()
select {
case <-locked:
t.Error("unexpected unblocking")
case <-time.After(10 * time.Millisecond):
}
// unlock
err = dupl.Unlock()
if err != nil {
t.Fatal(err)
}
// the previously blocked routine should be unblocked
select {
case <-locked:
case <-time.After(10 * time.Millisecond):
t.Error("unexpected blocking")
}
}

View File

@ -27,12 +27,29 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
sort.Strings(newfnames)
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
err := os.Remove(f)
l, err := NewLock(f)
if err != nil {
errC <- err
return
}
log.Printf("filePurge: successfully remvoed file %s", f)
err = l.TryLock()
if err != nil {
break
}
err = os.Remove(f)
if err != nil {
errC <- err
return
}
err = l.Unlock()
if err != nil {
log.Printf("filePurge: unlock %s error %v", l.Name(), err)
}
err = l.Destroy()
if err != nil {
log.Printf("filePurge: destroy lock %s error %v", l.Name(), err)
}
log.Printf("filePurge: successfully removed file %s", f)
newfnames = newfnames[1:]
}
select {

View File

@ -31,7 +31,7 @@ func TestPurgeFile(t *testing.T) {
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond)
time.Sleep(2 * time.Millisecond)
}
fnames, err := ReadDir(dir)
if err != nil {
@ -48,3 +48,71 @@ func TestPurgeFile(t *testing.T) {
}
close(stop)
}
func TestPurgeFileHoldingLock(t *testing.T) {
dir, err := ioutil.TempDir("", "purgefile")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
for i := 0; i < 10; i++ {
_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
if err != nil {
t.Fatal(err)
}
}
// create a purge barrier at 5
l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5)))
err = l.Lock()
if err != nil {
t.Fatal(err)
}
stop := make(chan struct{})
errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
time.Sleep(5 * time.Millisecond)
fnames, err := ReadDir(dir)
if err != nil {
t.Fatal(err)
}
wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
if !reflect.DeepEqual(fnames, wnames) {
t.Errorf("filenames = %v, want %v", fnames, wnames)
}
select {
case err := <-errch:
t.Errorf("unexpected purge error %v", err)
case <-time.After(time.Millisecond):
}
// remove the purge barrier
err = l.Unlock()
if err != nil {
t.Fatal(err)
}
err = l.Destroy()
if err != nil {
t.Fatal(err)
}
time.Sleep(5 * time.Millisecond)
fnames, err = ReadDir(dir)
if err != nil {
t.Fatal(err)
}
wnames = []string{"7.test", "8.test", "9.test"}
if !reflect.DeepEqual(fnames, wnames) {
t.Errorf("filenames = %v, want %v", fnames, wnames)
}
select {
case err := <-errch:
t.Errorf("unexpected purge error %v", err)
case <-time.After(time.Millisecond):
}
close(stop)
}

View File

@ -43,7 +43,7 @@ func TestReadWriteTimeoutDialer(t *testing.T) {
defer conn.Close()
// fill the socket buffer
data := make([]byte, 1024*1024)
data := make([]byte, 5*1024*1024)
timer := time.AfterFunc(d.wtimeoutd*5, func() {
t.Fatal("wait timeout")
})

View File

@ -52,7 +52,7 @@ func TestWriteReadTimeoutListener(t *testing.T) {
defer conn.Close()
// fill the socket buffer
data := make([]byte, 1024*1024)
data := make([]byte, 5*1024*1024)
timer := time.AfterFunc(wln.wtimeoutd*5, func() {
t.Fatal("wait timeout")
})

View File

@ -279,7 +279,6 @@ func (r *raft) maybeCommit() bool {
}
sort.Sort(sort.Reverse(mis))
mci := mis[r.q()-1]
return r.raftLog.maybeCommit(mci, r.Term)
}
@ -473,7 +472,7 @@ func stepLeader(r *raft, m pb.Message) {
}
}
case pb.MsgVote:
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
@ -533,12 +532,12 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgVote:
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.elapsed = 0
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
} else {
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}

View File

@ -559,35 +559,34 @@ func TestCommit(t *testing.T) {
w uint64
}{
// single
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
{[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1},
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0},
{[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1},
// odd
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
// even
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
}
for i, tt := range tests {
prs := make(map[uint64]*progress)
storage := NewMemoryStorage()
storage.Append(tt.logs)
storage.hardState = pb.HardState{Term: tt.smTerm}
sm := newRaft(1, []uint64{1}, 5, 1, storage)
for j := 0; j < len(tt.matches); j++ {
prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
}
sm := &raft{
raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: unstable{offset: uint64(len(tt.logs))}},
prs: prs,
HardState: pb.HardState{Term: tt.smTerm},
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
}
sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w {
@ -675,15 +674,10 @@ func TestHandleMsgApp(t *testing.T) {
}
for i, tt := range tests {
sm := &raft{
state: StateFollower,
HardState: pb.HardState{Term: 2},
raftLog: &raftLog{
committed: 0,
storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
unstable: unstable{offset: 3},
},
}
storage := NewMemoryStorage()
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
sm := newRaft(1, []uint64{1}, 10, 1, storage)
sm.becomeFollower(2, None)
sm.handleAppendEntries(tt.m)
if sm.raftLog.lastIndex() != tt.wIndex {
@ -709,18 +703,15 @@ func TestHandleHeartbeat(t *testing.T) {
m pb.Message
wCommit uint64
}{
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
sm := &raft{
state: StateFollower,
HardState: pb.HardState{Term: 2},
raftLog: newLog(storage),
}
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
sm.becomeFollower(2, 2)
sm.raftLog.commitTo(commit)
sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
@ -1345,10 +1336,7 @@ func TestPromotable(t *testing.T) {
{[]uint64{2, 3}, false},
}
for i, tt := range tests {
r := &raft{id: id, prs: make(map[uint64]*progress)}
for _, id := range tt.peers {
r.prs[id] = &progress{}
}
r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage())
if g := r.promotable(); g != tt.wp {
t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
}
@ -1378,17 +1366,11 @@ func TestRaftNodes(t *testing.T) {
}
func ents(terms ...uint64) *raft {
ents := []pb.Entry{{}}
storage := NewMemoryStorage()
for i, term := range terms {
ents = append(ents, pb.Entry{Index: uint64(i + 1), Term: term})
}
sm := &raft{
raftLog: &raftLog{
storage: &MemoryStorage{ents: ents},
unstable: unstable{offset: uint64(len(ents))},
},
storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
}
sm := newRaft(1, []uint64{}, 5, 1, storage)
sm.reset(0)
return sm
}

View File

@ -65,8 +65,10 @@ type Sender interface {
Resume()
}
func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
s := &sender{
id: id,
active: true,
tr: tr,
u: u,
cid: cid,
@ -75,7 +77,7 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
shouldstop: shouldstop,
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
q: make(chan []byte, senderBufSize),
q: make(chan *raftpb.Message, senderBufSize),
}
s.wg.Add(connPerSender)
for i := 0; i < connPerSender; i++ {
@ -85,9 +87,10 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
}
type sender struct {
id types.ID
cid types.ID
tr http.RoundTripper
u string
cid types.ID
p Processor
fs *stats.FollowerStats
shouldstop chan struct{}
@ -95,13 +98,21 @@ type sender struct {
strmCln *streamClient
batcher *Batcher
propBatcher *ProposalBatcher
strmSrv *streamServer
strmSrvMu sync.Mutex
q chan []byte
q chan *raftpb.Message
paused bool
mu sync.RWMutex
wg sync.WaitGroup
strmSrvMu sync.Mutex
strmSrv *streamServer
// wait for the handling routines
wg sync.WaitGroup
mu sync.RWMutex
u string // the url this sender post to
// if the last send was successful, thi sender is active.
// Or it is inactive
active bool
errored error
paused bool
}
func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
@ -114,6 +125,7 @@ func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-cha
}
// stop the existing one
s.strmSrv.stop()
s.strmSrv = nil
}
s.strmSrv = startStreamServer(w, to, term, s.fs)
return s.strmSrv.stopNotify(), nil
@ -172,9 +184,8 @@ func (s *sender) Send(m raftpb.Message) error {
func (s *sender) send(m raftpb.Message) error {
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data := pbutil.MustMarshal(&m)
select {
case s.q <- data:
case s.q <- &m:
return nil
default:
log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
@ -189,6 +200,7 @@ func (s *sender) Stop() {
s.strmSrvMu.Lock()
if s.strmSrv != nil {
s.strmSrv.stop()
s.strmSrv = nil
}
s.strmSrvMu.Unlock()
if s.strmCln != nil {
@ -254,16 +266,35 @@ func (s *sender) tryStream(m raftpb.Message) bool {
func (s *sender) handle() {
defer s.wg.Done()
for d := range s.q {
for m := range s.q {
start := time.Now()
err := s.post(d)
err := s.post(pbutil.MustMarshal(m))
end := time.Now()
s.mu.Lock()
if err != nil {
s.fs.Fail()
log.Printf("sender: %v", err)
continue
if s.errored == nil || s.errored.Error() != err.Error() {
log.Printf("sender: error posting to %s: %v", s.id, err)
s.errored = err
}
if s.active {
log.Printf("sender: the connection with %s becomes inactive", s.id)
s.active = false
}
if m.Type == raftpb.MsgApp {
s.fs.Fail()
}
} else {
if !s.active {
log.Printf("sender: the connection with %s becomes active", s.id)
s.active = true
s.errored = nil
}
if m.Type == raftpb.MsgApp {
s.fs.Succ(end.Sub(start))
}
}
s.fs.Succ(end.Sub(start))
s.mu.Unlock()
}
}
@ -274,13 +305,13 @@ func (s *sender) post(data []byte) error {
req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
s.mu.RUnlock()
if err != nil {
return fmt.Errorf("new request to %s error: %v", s.u, err)
return err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
resp, err := s.tr.RoundTrip(req)
if err != nil {
return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
return err
}
resp.Body.Close()
@ -290,15 +321,15 @@ func (s *sender) post(data []byte) error {
case s.shouldstop <- struct{}{}:
default:
}
log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
return nil
case http.StatusForbidden:
select {
case s.shouldstop <- struct{}{}:
default:
}
log.Println("etcdserver: this member has been permanently removed from the cluster")
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
log.Println("rafthttp: this member has been permanently removed from the cluster")
log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
return nil
case http.StatusNoContent:
return nil

View File

@ -34,9 +34,9 @@ import (
func TestSenderSend(t *testing.T) {
tr := &roundTripperRecorder{}
fs := &stats.FollowerStats{}
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
if err := s.Send(raftpb.Message{}); err != nil {
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
t.Fatalf("unexpect send error: %v", err)
}
s.Stop()
@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) {
func TestSenderExceedMaximalServing(t *testing.T) {
tr := newRoundTripperBlocker()
fs := &stats.FollowerStats{}
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
// keep the sender busy and make the buffer full
// nothing can go out as we block the sender
@ -86,9 +86,9 @@ func TestSenderExceedMaximalServing(t *testing.T) {
// it increases fail count in stats.
func TestSenderSendFailed(t *testing.T) {
fs := &stats.FollowerStats{}
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
if err := s.Send(raftpb.Message{}); err != nil {
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
t.Fatalf("unexpect Send error: %v", err)
}
s.Stop()
@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) {
func TestSenderPost(t *testing.T) {
tr := &roundTripperRecorder{}
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil)
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
if err := s.post([]byte("some data")); err != nil {
t.Fatalf("unexpect post error: %v", err)
}
@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) {
}
for i, tt := range tests {
shouldstop := make(chan struct{})
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
err := s.post([]byte("some data"))
s.Stop()
@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) {
}
for i, tt := range tests {
shouldstop := make(chan struct{}, 1)
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
s.post([]byte("some data"))
s.Stop()
select {

View File

@ -42,7 +42,7 @@ function package {
cp etcd/README.md ${target}/README.md
cp etcd/etcdctl/README.md ${target}/README-etcdctl.md
cp -R etcd/Documentation/0.5 ${target}/Documentation
cp -R etcd/Documentation/2.0 ${target}/Documentation
}
function main {

2
test
View File

@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
source ./build
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
# user has not provided PKG override

View File

@ -17,5 +17,5 @@
package version
var (
Version = "0.5.0-alpha.5"
Version = "2.0.0-rc.1"
)

View File

@ -48,7 +48,7 @@ Cut issues 0x10 entries with incremental index later then the file will be calle
At a later time a WAL can be opened at a particular raft index:
w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
w, err := wal.Open("/var/lib/etcd", 0)
...
The raft index must have been written to the WAL. When opening without a

View File

@ -21,6 +21,7 @@ import (
"fmt"
"hash/crc32"
"io"
"log"
"os"
"path"
"reflect"
@ -67,6 +68,8 @@ type WAL struct {
seq uint64 // sequence of the wal file currently used for writes
enti uint64 // index of the last entry saved to the wal
encoder *encoder // encoder to encode records
locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
}
// Create creates a WAL ready for appending records. The given metadata is
@ -85,6 +88,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
if err != nil {
return nil, err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return nil, err
}
err = l.Lock()
if err != nil {
return nil, err
}
w := &WAL{
dir: dirpath,
metadata: metadata,
@ -92,6 +104,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
f: f,
encoder: newEncoder(f, 0),
}
w.locks = append(w.locks, l)
if err := w.saveCrc(0); err != nil {
return nil, err
}
@ -104,13 +117,23 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return w, nil
}
// OpenAtIndex opens the WAL at the given index.
// Open opens the WAL at the given index.
// The index SHOULD have been previously committed to the WAL, or the following
// ReadAll will fail.
// The returned WAL is ready to read and the first record will be the given
// index. The WAL cannot be appended to before reading out all of its
// previous records.
func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
func Open(dirpath string, index uint64) (*WAL, error) {
return openAtIndex(dirpath, index, true)
}
// OpenNotInUse only opens the wal files that are not in use.
// Other than that, it is similar to Open.
func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
return openAtIndex(dirpath, index, false)
}
func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return nil, err
@ -129,12 +152,27 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
// open the wal files for reading
rcs := make([]io.ReadCloser, 0)
ls := make([]fileutil.Lock, 0)
for _, name := range names[nameIndex:] {
f, err := os.Open(path.Join(dirpath, name))
if err != nil {
return nil, err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return nil, err
}
err = l.TryLock()
if err != nil {
if all {
return nil, err
} else {
log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
break
}
}
rcs = append(rcs, f)
ls = append(ls, l)
}
rc := MultiReadCloser(rcs...)
@ -157,8 +195,9 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
ri: index,
decoder: newDecoder(rc),
f: f,
seq: seq,
f: f,
seq: seq,
locks: ls,
}
return w, nil
}
@ -224,6 +263,15 @@ func (w *WAL) Cut() error {
if err != nil {
return err
}
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
if err = w.sync(); err != nil {
return err
}
@ -255,6 +303,30 @@ func (w *WAL) sync() error {
return w.f.Sync()
}
// ReleaseLockTo releases the locks w is holding, which
// have index smaller or equal to the given index.
func (w *WAL) ReleaseLockTo(index uint64) error {
for _, l := range w.locks {
_, i, err := parseWalName(path.Base(l.Name()))
if err != nil {
return err
}
if i > index {
return nil
}
err = l.Unlock()
if err != nil {
return err
}
err = l.Destroy()
if err != nil {
return err
}
w.locks = w.locks[1:]
}
return nil
}
func (w *WAL) Close() error {
if w.f != nil {
if err := w.sync(); err != nil {
@ -264,6 +336,11 @@ func (w *WAL) Close() error {
return err
}
}
for _, l := range w.locks {
// TODO: log the error
l.Unlock()
l.Destroy()
}
return nil
}

View File

@ -90,7 +90,7 @@ func TestOpenAtIndex(t *testing.T) {
}
f.Close()
w, err := OpenAtIndex(dir, 0)
w, err := Open(dir, 0)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -109,7 +109,7 @@ func TestOpenAtIndex(t *testing.T) {
}
f.Close()
w, err = OpenAtIndex(dir, 5)
w, err = Open(dir, 5)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -126,7 +126,7 @@ func TestOpenAtIndex(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(emptydir)
if _, err = OpenAtIndex(emptydir, 0); err != ErrFileNotFound {
if _, err = Open(emptydir, 0); err != ErrFileNotFound {
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
}
}
@ -219,7 +219,7 @@ func TestRecover(t *testing.T) {
}
w.Close()
if w, err = OpenAtIndex(p, 0); err != nil {
if w, err = Open(p, 0); err != nil {
t.Fatal(err)
}
metadata, state, entries, err := w.ReadAll()
@ -238,6 +238,7 @@ func TestRecover(t *testing.T) {
if !reflect.DeepEqual(state, s) {
t.Errorf("state = %+v, want %+v", state, s)
}
w.Close()
}
func TestSearchIndex(t *testing.T) {
@ -341,7 +342,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
for i := 0; i < 10; i++ {
w, err := OpenAtIndex(p, uint64(i))
w, err := Open(p, uint64(i))
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
@ -365,6 +366,7 @@ func TestRecoverAfterCut(t *testing.T) {
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
}
}
w.Close()
}
}
@ -384,7 +386,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
}
w.Close()
w, err = OpenAtIndex(p, 1)
w, err = Open(p, 1)
if err != nil {
t.Fatal(err)
}