Compare commits
44 Commits
v0.5.0-alp
...
v2.0.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
221abdcb3b | |||
fa35363f74 | |||
35a772753c | |||
aea87bc88d | |||
ce6f606766 | |||
a000e97eea | |||
2d76e5e273 | |||
f0b9ad3863 | |||
0a14927823 | |||
910198d117 | |||
722247a752 | |||
c27c288bef | |||
04522baeee | |||
43bb6cf038 | |||
8ece28d4f7 | |||
5369fb1c4f | |||
044e35b814 | |||
e9b06416de | |||
9dc5b5a7e8 | |||
e3dbfefbe0 | |||
0ea8c0929e | |||
502396edd5 | |||
6b73a72d42 | |||
53bf7e4b5e | |||
f538cba272 | |||
ea94d19147 | |||
b90693ccae | |||
dcf34c0ab4 | |||
ceb077424d | |||
d07434f99e | |||
cb6983cbb1 | |||
c586d5012c | |||
d86603840d | |||
e40a53b534 | |||
3f64c677e1 | |||
b8ab2b0b5c | |||
3cc4cdd363 | |||
c620238257 | |||
f265afa8ac | |||
bee3103931 | |||
fa195dae39 | |||
fe4abc40ce | |||
1f0d43250f | |||
97025bf5f1 |
@ -1,4 +1,5 @@
|
||||
language: go
|
||||
sudo: false
|
||||
go:
|
||||
- 1.3
|
||||
|
||||
|
@ -1,10 +1,10 @@
|
||||
## etcd 0.4.x -> 0.5.0 Data Migration Tool
|
||||
## etcd 0.4.x -> 2.0.0 Data Migration Tool
|
||||
|
||||
### Upgrading from 0.4.x
|
||||
|
||||
Between 0.4.x and 0.5, the on-disk data formats have changed. In order to allow users to convert to 0.5, a migration tool is provided.
|
||||
Between 0.4.x and 2.0, the on-disk data formats have changed. In order to allow users to convert to 2.0, a migration tool is provided.
|
||||
|
||||
In the early 0.5.0-alpha series, we're providing this tool early to encourage adoption. However, before 0.5.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency).
|
||||
In the early 2.0.0-alpha series, we're providing this tool early to encourage adoption. However, before 2.0.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency).
|
||||
|
||||
### Data Migration Tips
|
||||
|
||||
@ -18,7 +18,7 @@ The tool can be run via:
|
||||
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA>
|
||||
```
|
||||
|
||||
It should autodetect everything and convert the data-dir to be 0.5 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 0.5 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
|
||||
It should autodetect everything and convert the data-dir to be 2.0 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 2.0 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
|
||||
|
||||
If, however, it complains about autodetecting the name (which can happen, depending on how the cluster was configured), you need to supply the name of this particular node. This is equivalent to the `--name` flag (or `ETCD_NAME` variable) that etcd was run with, which can also be found by accessing the self api, eg:
|
||||
|
||||
@ -38,10 +38,10 @@ And the tool should migrate successfully. If it still has an error at this time,
|
||||
|
||||
### Recovering Disk Space
|
||||
|
||||
If the conversion has completed, the entire cluster is running on something 0.5-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
|
||||
If the conversion has completed, the entire cluster is running on something 2.0-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
|
||||
|
||||
```sh
|
||||
rm -ri snapshot conf log
|
||||
```
|
||||
|
||||
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 0.5 data.
|
||||
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.
|
@ -15,7 +15,7 @@ Using an out-of-date data directory can lead to inconsistency as the member had
|
||||
For maximum safety, if an etcd member suffers any sort of data corruption or loss, it must be removed from the cluster.
|
||||
Once removed the member can be re-added with an empty data directory.
|
||||
|
||||
[members-api]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#members-api
|
||||
[members-api]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#members-api
|
||||
|
||||
#### Contents
|
||||
|
||||
@ -43,7 +43,70 @@ The data directory contains all the data to recover a member to its point-in-tim
|
||||
* Update the peer URLs for that member to reflect the new machine according to the [member api] [change peer url]
|
||||
* Start etcd on the new machine, using the same configuration and the copy of the data directory
|
||||
|
||||
[change peer url]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#change-the-peer-urls-of-a-member
|
||||
This example will walk you through the process of migrating the infra1 member to a new machine:
|
||||
|
||||
|Name|Peer URL|
|
||||
|------|--------------|
|
||||
|infra0|10.0.1.10:2380|
|
||||
|infra1|10.0.1.11:2380|
|
||||
|infra2|10.0.1.12:2380|
|
||||
|
||||
```
|
||||
$ export ETCDCTL_PEERS=http://10.0.1.10:2379,http://10.0.1.11:2379,http://10.0.1.12:2379
|
||||
```
|
||||
|
||||
```
|
||||
$ etcdctl member list
|
||||
84194f7c5edd8b37: name=infra0 peerURLs=http://10.0.1.10:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.10:2379
|
||||
b4db3bf5e495e255: name=infra1 peerURLs=http://10.0.1.11:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.11:2379
|
||||
bc1083c870280d44: name=infra2 peerURLs=http://10.0.1.12:2380 clientURLs=http://127.0.0.1:2379,http://10.0.1.12:2379
|
||||
```
|
||||
|
||||
#### Stop the member etcd process
|
||||
|
||||
```
|
||||
$ ssh core@10.0.1.11
|
||||
```
|
||||
|
||||
```
|
||||
$ sudo systemctl stop etcd
|
||||
```
|
||||
|
||||
#### Copy the data directory of the now-idle member to the new machine
|
||||
|
||||
```
|
||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||
```
|
||||
|
||||
```
|
||||
$ scp node1.etcd.tar.gz core@10.0.1.13:~/
|
||||
```
|
||||
|
||||
#### Update the peer URLs for that member to reflect the new machine
|
||||
|
||||
```
|
||||
$ curl http://10.0.1.10:2379/v2/members/b4db3bf5e495e255 -XPUT \
|
||||
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.1.13:2380"]}'
|
||||
```
|
||||
|
||||
#### Start etcd on the new machine, using the same configuration and the copy of the data directory
|
||||
|
||||
```
|
||||
$ ssh core@10.0.1.13
|
||||
```
|
||||
|
||||
```
|
||||
$ tar -xzvf node1.etcd.tar.gz -C /var/lib/etcd
|
||||
```
|
||||
|
||||
```
|
||||
etcd -name node1 \
|
||||
-listen-peer-urls http://10.0.1.13:2380 \
|
||||
-listen-client-urls http://10.0.1.13:2379,http://127.0.0.1:2379 \
|
||||
-advertise-client-urls http://10.0.1.13:2379,http://127.0.0.1:2379
|
||||
```
|
||||
|
||||
[change peer url]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#change-the-peer-urls-of-a-member
|
||||
|
||||
### Disaster Recovery
|
||||
|
53
Documentation/2.0/backward_compatibility.md
Normal file
53
Documentation/2.0/backward_compatibility.md
Normal file
@ -0,0 +1,53 @@
|
||||
### Backward Compatibility
|
||||
|
||||
The main goal of etcd 2.0 release is to improve cluster safety around bootstrapping and dynamic reconfiguration. To do this, we deprecated the old error-prone APIs and provide a new set of APIs.
|
||||
|
||||
The other main focus of this release was a more reliable Raft implementation, but as this change is internal it should not have any notable effects to users.
|
||||
|
||||
#### Command Line Flags Changes
|
||||
|
||||
The major flag changes are to mostly related to bootstrapping. The `initial-*` flags provide an improved way to specify the required criteria to start the cluster. The advertised URLs now support a list of values instead of a single value, which allows etcd users to gracefully migrate to the new set of IANA-assigned ports (2379/client and 2380/peers) while maintaining backward compatibility with the old ports.
|
||||
|
||||
- `-addr` is replaced by `-advertise-client-urls`.
|
||||
- `-bind-addr` is replaced by `-listen-client-urls`.
|
||||
- `-peer-add` is replaced by `-initial-advertise-peer-urls`.
|
||||
- `-peer-bind-addr` is replaced by `-listen-peer-urls`.
|
||||
- `-peers` is replaced by `-initial-cluster`.
|
||||
- `-peers-file` is replaced by `-initial-cluster`.
|
||||
|
||||
The documentation of new command line flags can be found at
|
||||
https://github.com/coreos/etcd/blob/master/Documentation/2.0/configuration.md.
|
||||
|
||||
#### Data Dir
|
||||
- Default data dir location has changed from {$hostname}.etcd to {name}.etcd.
|
||||
|
||||
- The disk format within the data dir has changed. etcd 2.0 should be able to auto upgrade the old data format. Instructions on doing so manually are in the [migration tool doc][migrationtooldoc].
|
||||
|
||||
[migrationtooldoc]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/0_4_migration_tool.md
|
||||
|
||||
#### Standby
|
||||
|
||||
etcd 0.4’s standby mode has been deprecated by 2.0’s [proxy mode][proxymode].
|
||||
|
||||
Standby mode was intended for large clusters that had a subset of the members acting in the consensus process. Overall this process was too magical and allowed for operators to back themselves into a corner.
|
||||
|
||||
Proxy mode in 2.0 will provide similar functionality, and with improved control over which machines act as proxies due to the operator specifically configuring them. Proxies also support read only or read/write modes for increased security and durability.
|
||||
|
||||
[proxymode]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/proxy.md
|
||||
|
||||
#### Discovery Service
|
||||
|
||||
A size key needs to be provided inside a [discovery token][discoverytoken].
|
||||
[discoverytoken]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#custom-etcd-discovery-service
|
||||
|
||||
#### HTTP Admin API
|
||||
|
||||
`v2/admin` on peer url and `v2/keys/_etcd` are unified under the new [v2/member API][memberapi] to better explain which machines are part of an etcd cluster, and to simplify the keyspace for all your use cases.
|
||||
|
||||
[memberapi]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md
|
||||
|
||||
#### HTTP Key Value API
|
||||
- The follower can now transparently proxy write equests to the leader. Clients will no longer see 307 redirections to the leader from etcd.
|
||||
|
||||
- Expiration time is in UTC instead of local time.
|
||||
|
@ -30,18 +30,21 @@ On each machine you would start etcd with these flags:
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls https://10.0.1.10:2380 \
|
||||
-listen-peer-urls https://10.0.1.10:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
```
|
||||
$ etcd -name infra1 -initial-advertise-peer-urls https://10.0.1.11:2380 \
|
||||
-listen-peer-urls https://10.0.1.11:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
```
|
||||
$ etcd -name infra2 -initial-advertise-peer-urls https://10.0.1.12:2380 \
|
||||
-listen-peer-urls https://10.0.1.12:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster-state new
|
||||
@ -55,6 +58,7 @@ In the following example, we have not included our new host in the list of enume
|
||||
|
||||
```
|
||||
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
|
||||
-listen-peer-urls https://10.0.1.11:2380 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380 \
|
||||
-initial-cluster-state new
|
||||
etcd: infra1 not listed in the initial cluster config
|
||||
@ -65,6 +69,7 @@ In this example, we are attempting to map a node (infra0) on a different address
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://127.0.0.1:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster-state=new
|
||||
etcd: error setting up initial cluster: infra0 has different advertised URLs in the cluster and advertised peer URLs list
|
||||
@ -75,6 +80,7 @@ If you configure a peer with a different set of configuration and attempt to joi
|
||||
|
||||
```
|
||||
$ etcd -name infra3 -initial-advertise-peer-urls http://10.0.1.13:2380 \
|
||||
-listen-peer-urls http://10.0.1.13:2380 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra3=http://10.0.1.13:2380 \
|
||||
-initial-cluster-state=new
|
||||
etcd: conflicting cluster ID to the target cluster (c6ab534d07e8fcc4 != bc25ea2a74fb18b0). Exiting.
|
||||
@ -91,7 +97,7 @@ A discovery URL identifies a unique etcd cluster. Instead of reusing a discovery
|
||||
|
||||
Moreover, discovery URLs should ONLY be used for the initial bootstrapping of a cluster. To change cluster membership after the cluster is already running, see the [runtime reconfiguration][runtime] guide.
|
||||
|
||||
[runtime]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/runtime-configuration.md
|
||||
[runtime]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/runtime-configuration.md
|
||||
|
||||
### Custom etcd discovery service
|
||||
|
||||
@ -111,14 +117,17 @@ Now we start etcd with those relevant flags for each member:
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
|
||||
```
|
||||
```
|
||||
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
|
||||
-listen-peer-urls http://10.0.1.11:2380 \
|
||||
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
|
||||
```
|
||||
```
|
||||
$ etcd -name infra2 -initial-advertise-peer-urls http://10.0.1.12:2380 \
|
||||
-listen-peer-urls http://10.0.1.12:2380 \
|
||||
-discovery https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83
|
||||
```
|
||||
|
||||
@ -152,14 +161,17 @@ Now we start etcd with those relevant flags for each member:
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
```
|
||||
```
|
||||
$ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \
|
||||
-listen-peer-urls http://10.0.1.11:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
```
|
||||
```
|
||||
$ etcd -name infra2 -initial-advertise-peer-urls http://10.0.1.12:2380 \
|
||||
-listen-peer-urls http://10.0.1.12:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
```
|
||||
|
||||
@ -174,6 +186,7 @@ You can use the environment variable `ETCD_DISCOVERY_PROXY` to cause etcd to use
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
etcd: error: the cluster doesn’t have a size configuration value in https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de/_config
|
||||
exit 1
|
||||
@ -185,6 +198,7 @@ This error will occur if the discovery cluster already has the configured number
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de \
|
||||
-discovery-fallback exit
|
||||
etcd: discovery: cluster is full
|
||||
@ -198,13 +212,14 @@ ignored on this machine.
|
||||
|
||||
```
|
||||
$ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \
|
||||
-listen-peer-urls http://10.0.1.10:2380 \
|
||||
-discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at /var/lib/etcd
|
||||
```
|
||||
|
||||
# 0.4 to 0.5+ Migration Guide
|
||||
# 0.4 to 2.0+ Migration Guide
|
||||
|
||||
In etcd 0.5 we introduced the ability to listen on more than one address and to advertise multiple addresses. This makes using etcd easier when you have complex networking, such as private and public networks on various cloud providers.
|
||||
In etcd 2.0 we introduced the ability to listen on more than one address and to advertise multiple addresses. This makes using etcd easier when you have complex networking, such as private and public networks on various cloud providers.
|
||||
|
||||
To make understanding this feature easier, we changed the naming of some flags, but we support the old flags to make the migration from the old to new version easier.
|
||||
|
@ -131,9 +131,9 @@ Be CAUTIOUS to use unsafe flags because it will break the guarantee given by con
|
||||
+ Print the version and exit.
|
||||
+ default: false
|
||||
|
||||
[build-cluster]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#static
|
||||
[reconfig]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/runtime-configuration.md
|
||||
[discovery]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#discovery
|
||||
[proxy]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/proxy.md
|
||||
[build-cluster]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#static
|
||||
[reconfig]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/runtime-configuration.md
|
||||
[discovery]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#discovery
|
||||
[proxy]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/proxy.md
|
||||
[security]: https://github.com/coreos/etcd/blob/master/Documentation/security.md
|
||||
[restore]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/admin_guide.md#restoring-a-backup
|
||||
[restore]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/admin_guide.md#restoring-a-backup
|
@ -28,4 +28,4 @@ Client is a caller of the cluster's HTTP API.
|
||||
|
||||
### Machine (deprecated)
|
||||
|
||||
The alternative of Member in etcd before 0.5
|
||||
The alternative of Member in etcd before 2.0
|
@ -59,21 +59,21 @@ If the POST body is malformed an HTTP 400 will be returned. If the member exists
|
||||
```
|
||||
POST /v2/members HTTP/1.1
|
||||
|
||||
{"peerURLs": ["http://10.0.0.10:2379"]}
|
||||
{"peerURLs": ["http://10.0.0.10:2380"]}
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
```sh
|
||||
curl http://10.0.0.10:2379/v2/members -XPOST \
|
||||
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2379"]}'
|
||||
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2380"]}'
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"id": "3777296169",
|
||||
"peerURLs": [
|
||||
"http://10.0.0.10:2379"
|
||||
"http://10.0.0.10:2380"
|
||||
]
|
||||
}
|
||||
```
|
||||
@ -108,12 +108,12 @@ If the POST body is malformed an HTTP 400 will be returned. If the member does n
|
||||
```
|
||||
PUT /v2/members/<id> HTTP/1.1
|
||||
|
||||
{"peerURLs": ["http://10.0.0.10:2379"]}
|
||||
{"peerURLs": ["http://10.0.0.10:2380"]}
|
||||
```
|
||||
|
||||
#### Example
|
||||
|
||||
```sh
|
||||
curl http://10.0.0.10:2379/v2/members/272e204152 -XPUT \
|
||||
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:12379"]}'
|
||||
-H "Content-Type: application/json" -d '{"peerURLs":["http://10.0.0.10:2380"]}'
|
||||
```
|
@ -29,4 +29,4 @@ etcd -proxy on -client-listen-urls 127.0.0.1:8080 -discovery https://discovery.
|
||||
#### Fallback to proxy mode with discovery service
|
||||
If you bootstrap a etcd cluster using [discovery service][discovery-service] with more than the expected number of etcd members, the extra etcd processes will fall back to being `readwrite` proxies by default. They will forward the requests to the cluster as described above. For example, if you create a discovery url with `size=5`, and start ten etcd processes using that same discovery URL, the result will be a cluster with five etcd members and five proxies. Note that this behaviour can be disabled with the `proxy-fallback` flag.
|
||||
|
||||
[discovery-service]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/clustering.md#discovery
|
||||
[discovery-service]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/clustering.md#discovery
|
@ -14,7 +14,7 @@ If etcd falls below a simple majority of members it can no longer accept writes:
|
||||
|
||||
If you want to migrate a running member to another machine, please refer [member migration section][member migration].
|
||||
|
||||
[member migration]: https://github.com/coreos/etcd/blob/master/Documentation/0.5/admin_guide.md#member-migration
|
||||
[member migration]: https://github.com/coreos/etcd/blob/master/Documentation/2.0/admin_guide.md#member-migration
|
||||
|
||||
### Increase Cluster Size
|
||||
|
||||
@ -53,7 +53,7 @@ To increase from 3 to 5 members you will make two add operations
|
||||
To decrease from 5 to 3 you will make two remove operations
|
||||
|
||||
All of these examples will use the `etcdctl` command line tool that ships with etcd.
|
||||
If you want to use the member API directly you can find the documentation [here](https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md).
|
||||
If you want to use the member API directly you can find the documentation [here](https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md).
|
||||
|
||||
### Remove a Member
|
||||
|
||||
@ -86,7 +86,7 @@ Removal of the leader is safe, but the cluster will be out of progress for a per
|
||||
|
||||
Adding a member is a two step process:
|
||||
|
||||
* Add the new member to the cluster via the [members API](https://github.com/coreos/etcd/blob/master/Documentation/0.5/other_apis.md#post-v2members) or the `etcdctl member add` command.
|
||||
* Add the new member to the cluster via the [members API](https://github.com/coreos/etcd/blob/master/Documentation/2.0/other_apis.md#post-v2members) or the `etcdctl member add` command.
|
||||
* Start the member with the correct configuration.
|
||||
|
||||
Using `etcdctl` let's add the new member to the cluster:
|
@ -706,35 +706,51 @@ curl -L http://127.0.0.1:4001/v2/keys/?recursive=true
|
||||
|
||||
### Deleting a Directory
|
||||
|
||||
Now let's try to delete the directory `/foo_dir`.
|
||||
Now let's try to delete the directory `/dir`
|
||||
|
||||
You can remove an empty directory using the `DELETE` verb and the `dir=true` parameter.
|
||||
You can remove an empty directory using the `DELETE` verb and the `dir=true` parameter. Following will succeed because `/dir` was empty
|
||||
|
||||
```sh
|
||||
curl -L 'http://127.0.0.1:4001/v2/keys/foo_dir?dir=true' -XDELETE
|
||||
curl -L 'http://127.0.0.1:4001/v2/keys/dir?dir=true' -XDELETE
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"action": "delete",
|
||||
"node": {
|
||||
"createdIndex": 30,
|
||||
"dir": true,
|
||||
"key": "/foo_dir",
|
||||
"key": "/dir",
|
||||
"modifiedIndex": 31
|
||||
},
|
||||
"prevNode": {
|
||||
"createdIndex": 30,
|
||||
"key": "/foo_dir",
|
||||
"key": "/dir",
|
||||
"dir": true,
|
||||
"modifiedIndex": 30
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
However, deleting `/foo_dir` will result into an error because `/foo_dir` is not empty.
|
||||
|
||||
```sh
|
||||
curl -L 'http://127.0.0.1:4001/v2/keys/foo_dir?dir=true' -XDELETE
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"errorCode":108,
|
||||
"message":"Directory not empty",
|
||||
"cause":"/foo_dir",
|
||||
"index":2
|
||||
}
|
||||
```
|
||||
|
||||
To delete a directory that holds keys, you must add `recursive=true`.
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/v2/keys/dir?recursive=true -XDELETE
|
||||
curl -L http://127.0.0.1:4001/v2/keys/foo_dir?recursive=true -XDELETE
|
||||
```
|
||||
|
||||
```json
|
||||
|
@ -27,6 +27,7 @@
|
||||
- [jplana/python-etcd](https://github.com/jplana/python-etcd) - Supports v2
|
||||
- [russellhaering/txetcd](https://github.com/russellhaering/txetcd) - a Twisted Python library
|
||||
- [cholcombe973/autodock](https://github.com/cholcombe973/autodock) - A docker deployment automation tool
|
||||
- [lisael/aioetcd](https://github.com/lisael/aioetcd) - (Python 3.4+) Asyncio coroutines client (Supports v2)
|
||||
|
||||
**Node libraries**
|
||||
|
||||
|
@ -3,13 +3,14 @@
|
||||
[](https://travis-ci.org/coreos/etcd)
|
||||
[](https://quay.io/repository/coreos/etcd-git)
|
||||
|
||||
### WARNING ###
|
||||
### Release Candidate Warning ###
|
||||
|
||||
The current `master` branch of etcd is under heavy development in anticipation of the forthcoming 0.5.0 release.
|
||||
The current `master` branch of etcd is under development in anticipation of the forthcoming 2.0.0 release.
|
||||
|
||||
It is strongly recommended that users work with the latest 0.4.x release (0.4.6), which can be found on the [releases](https://github.com/coreos/etcd/releases) page.
|
||||
|
||||
Unless otherwise noted, the etcd documentation refers to configuring and running 0.4.x releases.
|
||||
Documentation related to the 2.0.0 release candidates can be found in the `Documentation/2.0` directory.
|
||||
|
||||
## README version 0.4.6
|
||||
|
||||
|
@ -276,7 +276,8 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (clien
|
||||
if len(nodes) > size {
|
||||
nodes = nodes[:size]
|
||||
}
|
||||
w := d.c.RecursiveWatch(d.cluster, index)
|
||||
// watch from the next index
|
||||
w := d.c.RecursiveWatch(d.cluster, index+1)
|
||||
all := make(client.Nodes, len(nodes))
|
||||
copy(all, nodes)
|
||||
for _, n := range all {
|
||||
|
@ -67,7 +67,7 @@ func handleBackup(c *cli.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
w, err := wal.OpenAtIndex(srcWAL, index)
|
||||
w, err := wal.OpenNotInUse(srcWAL, index)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -91,10 +91,26 @@ func getEndpoints(c *cli.Context) ([]string, error) {
|
||||
}
|
||||
|
||||
func getTransport(c *cli.Context) (*http.Transport, error) {
|
||||
cafile := c.GlobalString("ca-file")
|
||||
certfile := c.GlobalString("cert-file")
|
||||
keyfile := c.GlobalString("key-file")
|
||||
|
||||
// Use an environment variable if nothing was supplied on the
|
||||
// command line
|
||||
if cafile == "" {
|
||||
cafile = os.Getenv("ETCDCTL_CA_FILE")
|
||||
}
|
||||
if certfile == "" {
|
||||
certfile = os.Getenv("ETCDCTL_CERT_FILE")
|
||||
}
|
||||
if keyfile == "" {
|
||||
keyfile = os.Getenv("ETCDCTL_KEY_FILE")
|
||||
}
|
||||
|
||||
tls := transport.TLSInfo{
|
||||
CAFile: c.GlobalString("ca-file"),
|
||||
CertFile: c.GlobalString("cert-file"),
|
||||
KeyFile: c.GlobalString("key-file"),
|
||||
CAFile: cafile,
|
||||
CertFile: certfile,
|
||||
KeyFile: keyfile,
|
||||
}
|
||||
return transport.NewTransport(tls)
|
||||
|
||||
|
@ -260,8 +260,13 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET") {
|
||||
return
|
||||
}
|
||||
stats := h.stats.LeaderStats()
|
||||
if stats == nil {
|
||||
writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(h.stats.LeaderStats())
|
||||
w.Write(stats)
|
||||
}
|
||||
|
||||
func serveVersion(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/stats"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@ -48,6 +49,7 @@ type sendHub struct {
|
||||
p rafthttp.Processor
|
||||
ss *stats.ServerStats
|
||||
ls *stats.LeaderStats
|
||||
mu sync.RWMutex // protect the sender map
|
||||
senders map[types.ID]rafthttp.Sender
|
||||
shouldstop chan struct{}
|
||||
}
|
||||
@ -67,7 +69,11 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *s
|
||||
}
|
||||
}
|
||||
|
||||
func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
|
||||
func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.senders[id]
|
||||
}
|
||||
|
||||
func (h *sendHub) Send(msgs []raftpb.Message) {
|
||||
for _, m := range msgs {
|
||||
@ -102,6 +108,8 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
|
||||
}
|
||||
|
||||
func (h *sendHub) Add(m *Member) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if _, ok := h.senders[m.ID]; ok {
|
||||
return
|
||||
}
|
||||
@ -113,16 +121,20 @@ func (h *sendHub) Add(m *Member) {
|
||||
}
|
||||
u.Path = path.Join(u.Path, raftPrefix)
|
||||
fs := h.ls.Follower(m.ID.String())
|
||||
s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), h.p, fs, h.shouldstop)
|
||||
s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop)
|
||||
h.senders[m.ID] = s
|
||||
}
|
||||
|
||||
func (h *sendHub) Remove(id types.ID) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.senders[id].Stop()
|
||||
delete(h.senders, id)
|
||||
}
|
||||
|
||||
func (h *sendHub) Update(m *Member) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
// TODO: return error or just panic?
|
||||
if _, ok := h.senders[m.ID]; !ok {
|
||||
return
|
||||
|
@ -90,21 +90,6 @@ type Response struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
// Save function saves ents and state to the underlying stable storage.
|
||||
// Save MUST block until st and ents are on stable storage.
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
|
||||
// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
|
||||
// remove it in this interface.
|
||||
// Cut cuts out a new wal file for saving new state and entries.
|
||||
Cut() error
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type Server interface {
|
||||
// Start performs any initialization of the Server necessary for it to
|
||||
// begin serving requests. It must be called before Do or Process.
|
||||
@ -295,15 +280,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
id: id,
|
||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
Cluster: cfg.Cluster,
|
||||
storage: struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
}{w, ss},
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
Ticker: time.Tick(100 * time.Millisecond),
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
snapCount: cfg.SnapCount,
|
||||
storage: NewStorage(w, ss),
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
Ticker: time.Tick(100 * time.Millisecond),
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
snapCount: cfg.SnapCount,
|
||||
}
|
||||
srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
|
||||
for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) {
|
||||
@ -403,6 +385,11 @@ func (s *EtcdServer) run() {
|
||||
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
|
||||
if rd.RaftState == raft.StateLeader {
|
||||
syncC = s.SyncTicker
|
||||
// TODO: remove the nil checking
|
||||
// current test utility does not provide the stats
|
||||
if s.stats != nil {
|
||||
s.stats.BecomeLeader()
|
||||
}
|
||||
} else {
|
||||
syncC = nil
|
||||
}
|
||||
@ -544,7 +531,10 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
||||
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
|
||||
|
||||
func (s *EtcdServer) LeaderStats() []byte {
|
||||
// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
|
||||
lead := atomic.LoadUint64(&s.raftLead)
|
||||
if lead != uint64(s.id) {
|
||||
return nil
|
||||
}
|
||||
return s.lstats.JSON()
|
||||
}
|
||||
|
||||
@ -1005,7 +995,7 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (ty
|
||||
|
||||
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var err error
|
||||
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
|
||||
if w, err = wal.Open(waldir, index); err != nil {
|
||||
log.Fatalf("etcdserver: open wal error: %v", err)
|
||||
}
|
||||
var wmetadata []byte
|
||||
|
@ -141,3 +141,11 @@ func (ss *ServerStats) SendAppendReq(reqSize int) {
|
||||
|
||||
ss.SendAppendRequestCnt++
|
||||
}
|
||||
|
||||
func (ss *ServerStats) BecomeLeader() {
|
||||
if ss.State != raft.StateLeader {
|
||||
ss.State = raft.StateLeader
|
||||
ss.LeaderInfo.Name = ss.ID
|
||||
ss.LeaderInfo.StartTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
45
etcdserver/storage.go
Normal file
45
etcdserver/storage.go
Normal file
@ -0,0 +1,45 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
// Save function saves ents and state to the underlying stable storage.
|
||||
// Save MUST block until st and ents are on stable storage.
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
|
||||
// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
|
||||
// remove it in this interface.
|
||||
// Cut cuts out a new wal file for saving new state and entries.
|
||||
Cut() error
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type storage struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
}
|
||||
|
||||
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
||||
return &storage{w, s}
|
||||
}
|
||||
|
||||
// SaveSnap saves the snapshot to disk and release the locked
|
||||
// wal files since they will not be used.
|
||||
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
||||
err := st.Snapshotter.SaveSnap(snap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -145,7 +145,6 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
|
||||
//snapNodes[p.Name] = uint64(m.ID)
|
||||
//}
|
||||
for _, p := range cfg.Peers {
|
||||
log.Printf(p.Name)
|
||||
delete(snapNodes, p.Name)
|
||||
}
|
||||
if len(snapNodes) == 1 {
|
||||
|
@ -490,8 +490,6 @@ func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type)
|
||||
|
||||
return &ent5, nil
|
||||
}
|
||||
|
||||
|
60
pkg/fileutil/lock.go
Normal file
60
pkg/fileutil/lock.go
Normal file
@ -0,0 +1,60 @@
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLocked = errors.New("file already locked")
|
||||
)
|
||||
|
||||
type Lock interface {
|
||||
Name() string
|
||||
TryLock() error
|
||||
Lock() error
|
||||
Unlock() error
|
||||
Destroy() error
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
fd int
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (l *lock) Name() string {
|
||||
return l.file.Name()
|
||||
}
|
||||
|
||||
// TryLock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) TryLock() error {
|
||||
err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
if err != nil && err == syscall.EWOULDBLOCK {
|
||||
return ErrLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) Lock() error {
|
||||
return syscall.Flock(l.fd, syscall.LOCK_EX)
|
||||
}
|
||||
|
||||
// Unlock unlocks the lock
|
||||
func (l *lock) Unlock() error {
|
||||
return syscall.Flock(l.fd, syscall.LOCK_UN)
|
||||
}
|
||||
|
||||
func (l *lock) Destroy() error {
|
||||
return l.file.Close()
|
||||
}
|
||||
|
||||
func NewLock(file string) (Lock, error) {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l := &lock{int(f.Fd()), f}
|
||||
return l, nil
|
||||
}
|
82
pkg/fileutil/lock_test.go
Normal file
82
pkg/fileutil/lock_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLockAndUnlock(t *testing.T) {
|
||||
f, err := ioutil.TempFile("", "lock")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
defer func() {
|
||||
err := os.Remove(f.Name())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// lock the file
|
||||
l, err := NewLock(f.Name())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer l.Destroy()
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// try lock a locked file
|
||||
dupl, err := NewLock(f.Name())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = dupl.TryLock()
|
||||
if err != ErrLocked {
|
||||
t.Errorf("err = %v, want %v", err, ErrLocked)
|
||||
}
|
||||
|
||||
// unlock the file
|
||||
err = l.Unlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// try lock the unlocked file
|
||||
err = dupl.TryLock()
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want %v", err, nil)
|
||||
}
|
||||
defer dupl.Destroy()
|
||||
|
||||
// blocking on locked file
|
||||
locked := make(chan struct{}, 1)
|
||||
go func() {
|
||||
l.Lock()
|
||||
locked <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-locked:
|
||||
t.Error("unexpected unblocking")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
// unlock
|
||||
err = dupl.Unlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// the previously blocked routine should be unblocked
|
||||
select {
|
||||
case <-locked:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Error("unexpected blocking")
|
||||
}
|
||||
}
|
@ -27,12 +27,29 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
|
||||
sort.Strings(newfnames)
|
||||
for len(newfnames) > int(max) {
|
||||
f := path.Join(dirname, newfnames[0])
|
||||
err := os.Remove(f)
|
||||
l, err := NewLock(f)
|
||||
if err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
log.Printf("filePurge: successfully remvoed file %s", f)
|
||||
err = l.TryLock()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
err = os.Remove(f)
|
||||
if err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
err = l.Unlock()
|
||||
if err != nil {
|
||||
log.Printf("filePurge: unlock %s error %v", l.Name(), err)
|
||||
}
|
||||
err = l.Destroy()
|
||||
if err != nil {
|
||||
log.Printf("filePurge: destroy lock %s error %v", l.Name(), err)
|
||||
}
|
||||
log.Printf("filePurge: successfully removed file %s", f)
|
||||
newfnames = newfnames[1:]
|
||||
}
|
||||
select {
|
||||
|
@ -31,7 +31,7 @@ func TestPurgeFile(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
}
|
||||
fnames, err := ReadDir(dir)
|
||||
if err != nil {
|
||||
@ -48,3 +48,71 @@ func TestPurgeFile(t *testing.T) {
|
||||
}
|
||||
close(stop)
|
||||
}
|
||||
|
||||
func TestPurgeFileHoldingLock(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "purgefile")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// create a purge barrier at 5
|
||||
l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5)))
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
fnames, err := ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
|
||||
if !reflect.DeepEqual(fnames, wnames) {
|
||||
t.Errorf("filenames = %v, want %v", fnames, wnames)
|
||||
}
|
||||
select {
|
||||
case err := <-errch:
|
||||
t.Errorf("unexpected purge error %v", err)
|
||||
case <-time.After(time.Millisecond):
|
||||
}
|
||||
|
||||
// remove the purge barrier
|
||||
err = l.Unlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = l.Destroy()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
fnames, err = ReadDir(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wnames = []string{"7.test", "8.test", "9.test"}
|
||||
if !reflect.DeepEqual(fnames, wnames) {
|
||||
t.Errorf("filenames = %v, want %v", fnames, wnames)
|
||||
}
|
||||
select {
|
||||
case err := <-errch:
|
||||
t.Errorf("unexpected purge error %v", err)
|
||||
case <-time.After(time.Millisecond):
|
||||
}
|
||||
|
||||
close(stop)
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func TestReadWriteTimeoutDialer(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// fill the socket buffer
|
||||
data := make([]byte, 1024*1024)
|
||||
data := make([]byte, 5*1024*1024)
|
||||
timer := time.AfterFunc(d.wtimeoutd*5, func() {
|
||||
t.Fatal("wait timeout")
|
||||
})
|
||||
|
@ -52,7 +52,7 @@ func TestWriteReadTimeoutListener(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// fill the socket buffer
|
||||
data := make([]byte, 1024*1024)
|
||||
data := make([]byte, 5*1024*1024)
|
||||
timer := time.AfterFunc(wln.wtimeoutd*5, func() {
|
||||
t.Fatal("wait timeout")
|
||||
})
|
||||
|
@ -279,7 +279,6 @@ func (r *raft) maybeCommit() bool {
|
||||
}
|
||||
sort.Sort(sort.Reverse(mis))
|
||||
mci := mis[r.q()-1]
|
||||
|
||||
return r.raftLog.maybeCommit(mci, r.Term)
|
||||
}
|
||||
|
||||
@ -473,7 +472,7 @@ func stepLeader(r *raft, m pb.Message) {
|
||||
}
|
||||
}
|
||||
case pb.MsgVote:
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||
}
|
||||
@ -533,12 +532,12 @@ func stepFollower(r *raft, m pb.Message) {
|
||||
case pb.MsgVote:
|
||||
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
||||
r.elapsed = 0
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||
r.Vote = m.From
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
||||
} else {
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||
}
|
||||
|
@ -559,35 +559,34 @@ func TestCommit(t *testing.T) {
|
||||
w uint64
|
||||
}{
|
||||
// single
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
|
||||
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1},
|
||||
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0},
|
||||
{[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
|
||||
{[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1},
|
||||
|
||||
// odd
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
|
||||
|
||||
// even
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
|
||||
{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
|
||||
{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
prs := make(map[uint64]*progress)
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(tt.logs)
|
||||
storage.hardState = pb.HardState{Term: tt.smTerm}
|
||||
|
||||
sm := newRaft(1, []uint64{1}, 5, 1, storage)
|
||||
for j := 0; j < len(tt.matches); j++ {
|
||||
prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
|
||||
}
|
||||
sm := &raft{
|
||||
raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: unstable{offset: uint64(len(tt.logs))}},
|
||||
prs: prs,
|
||||
HardState: pb.HardState{Term: tt.smTerm},
|
||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
|
||||
}
|
||||
sm.maybeCommit()
|
||||
if g := sm.raftLog.committed; g != tt.w {
|
||||
@ -675,15 +674,10 @@ func TestHandleMsgApp(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := &raft{
|
||||
state: StateFollower,
|
||||
HardState: pb.HardState{Term: 2},
|
||||
raftLog: &raftLog{
|
||||
committed: 0,
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
|
||||
unstable: unstable{offset: 3},
|
||||
},
|
||||
}
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, storage)
|
||||
sm.becomeFollower(2, None)
|
||||
|
||||
sm.handleAppendEntries(tt.m)
|
||||
if sm.raftLog.lastIndex() != tt.wIndex {
|
||||
@ -709,18 +703,15 @@ func TestHandleHeartbeat(t *testing.T) {
|
||||
m pb.Message
|
||||
wCommit uint64
|
||||
}{
|
||||
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
|
||||
{pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
|
||||
{pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1},
|
||||
{pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
||||
sm := &raft{
|
||||
state: StateFollower,
|
||||
HardState: pb.HardState{Term: 2},
|
||||
raftLog: newLog(storage),
|
||||
}
|
||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
|
||||
sm.becomeFollower(2, 2)
|
||||
sm.raftLog.commitTo(commit)
|
||||
sm.handleHeartbeat(tt.m)
|
||||
if sm.raftLog.committed != tt.wCommit {
|
||||
@ -1345,10 +1336,7 @@ func TestPromotable(t *testing.T) {
|
||||
{[]uint64{2, 3}, false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := &raft{id: id, prs: make(map[uint64]*progress)}
|
||||
for _, id := range tt.peers {
|
||||
r.prs[id] = &progress{}
|
||||
}
|
||||
r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage())
|
||||
if g := r.promotable(); g != tt.wp {
|
||||
t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
|
||||
}
|
||||
@ -1378,17 +1366,11 @@ func TestRaftNodes(t *testing.T) {
|
||||
}
|
||||
|
||||
func ents(terms ...uint64) *raft {
|
||||
ents := []pb.Entry{{}}
|
||||
storage := NewMemoryStorage()
|
||||
for i, term := range terms {
|
||||
ents = append(ents, pb.Entry{Index: uint64(i + 1), Term: term})
|
||||
}
|
||||
|
||||
sm := &raft{
|
||||
raftLog: &raftLog{
|
||||
storage: &MemoryStorage{ents: ents},
|
||||
unstable: unstable{offset: uint64(len(ents))},
|
||||
},
|
||||
storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
|
||||
}
|
||||
sm := newRaft(1, []uint64{}, 5, 1, storage)
|
||||
sm.reset(0)
|
||||
return sm
|
||||
}
|
||||
|
@ -65,8 +65,10 @@ type Sender interface {
|
||||
Resume()
|
||||
}
|
||||
|
||||
func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
|
||||
func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
|
||||
s := &sender{
|
||||
id: id,
|
||||
active: true,
|
||||
tr: tr,
|
||||
u: u,
|
||||
cid: cid,
|
||||
@ -75,7 +77,7 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
|
||||
shouldstop: shouldstop,
|
||||
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
|
||||
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
|
||||
q: make(chan []byte, senderBufSize),
|
||||
q: make(chan *raftpb.Message, senderBufSize),
|
||||
}
|
||||
s.wg.Add(connPerSender)
|
||||
for i := 0; i < connPerSender; i++ {
|
||||
@ -85,9 +87,10 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
|
||||
}
|
||||
|
||||
type sender struct {
|
||||
id types.ID
|
||||
cid types.ID
|
||||
|
||||
tr http.RoundTripper
|
||||
u string
|
||||
cid types.ID
|
||||
p Processor
|
||||
fs *stats.FollowerStats
|
||||
shouldstop chan struct{}
|
||||
@ -95,13 +98,21 @@ type sender struct {
|
||||
strmCln *streamClient
|
||||
batcher *Batcher
|
||||
propBatcher *ProposalBatcher
|
||||
strmSrv *streamServer
|
||||
strmSrvMu sync.Mutex
|
||||
q chan []byte
|
||||
q chan *raftpb.Message
|
||||
|
||||
paused bool
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
strmSrvMu sync.Mutex
|
||||
strmSrv *streamServer
|
||||
|
||||
// wait for the handling routines
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.RWMutex
|
||||
u string // the url this sender post to
|
||||
// if the last send was successful, thi sender is active.
|
||||
// Or it is inactive
|
||||
active bool
|
||||
errored error
|
||||
paused bool
|
||||
}
|
||||
|
||||
func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
|
||||
@ -114,6 +125,7 @@ func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-cha
|
||||
}
|
||||
// stop the existing one
|
||||
s.strmSrv.stop()
|
||||
s.strmSrv = nil
|
||||
}
|
||||
s.strmSrv = startStreamServer(w, to, term, s.fs)
|
||||
return s.strmSrv.stopNotify(), nil
|
||||
@ -172,9 +184,8 @@ func (s *sender) Send(m raftpb.Message) error {
|
||||
func (s *sender) send(m raftpb.Message) error {
|
||||
// TODO: don't block. we should be able to have 1000s
|
||||
// of messages out at a time.
|
||||
data := pbutil.MustMarshal(&m)
|
||||
select {
|
||||
case s.q <- data:
|
||||
case s.q <- &m:
|
||||
return nil
|
||||
default:
|
||||
log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
|
||||
@ -189,6 +200,7 @@ func (s *sender) Stop() {
|
||||
s.strmSrvMu.Lock()
|
||||
if s.strmSrv != nil {
|
||||
s.strmSrv.stop()
|
||||
s.strmSrv = nil
|
||||
}
|
||||
s.strmSrvMu.Unlock()
|
||||
if s.strmCln != nil {
|
||||
@ -254,16 +266,35 @@ func (s *sender) tryStream(m raftpb.Message) bool {
|
||||
|
||||
func (s *sender) handle() {
|
||||
defer s.wg.Done()
|
||||
for d := range s.q {
|
||||
for m := range s.q {
|
||||
start := time.Now()
|
||||
err := s.post(d)
|
||||
err := s.post(pbutil.MustMarshal(m))
|
||||
end := time.Now()
|
||||
|
||||
s.mu.Lock()
|
||||
if err != nil {
|
||||
s.fs.Fail()
|
||||
log.Printf("sender: %v", err)
|
||||
continue
|
||||
if s.errored == nil || s.errored.Error() != err.Error() {
|
||||
log.Printf("sender: error posting to %s: %v", s.id, err)
|
||||
s.errored = err
|
||||
}
|
||||
if s.active {
|
||||
log.Printf("sender: the connection with %s becomes inactive", s.id)
|
||||
s.active = false
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
s.fs.Fail()
|
||||
}
|
||||
} else {
|
||||
if !s.active {
|
||||
log.Printf("sender: the connection with %s becomes active", s.id)
|
||||
s.active = true
|
||||
s.errored = nil
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
s.fs.Succ(end.Sub(start))
|
||||
}
|
||||
}
|
||||
s.fs.Succ(end.Sub(start))
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -274,13 +305,13 @@ func (s *sender) post(data []byte) error {
|
||||
req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
|
||||
s.mu.RUnlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new request to %s error: %v", s.u, err)
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/protobuf")
|
||||
req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
|
||||
resp, err := s.tr.RoundTrip(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
@ -290,15 +321,15 @@ func (s *sender) post(data []byte) error {
|
||||
case s.shouldstop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
|
||||
log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
|
||||
return nil
|
||||
case http.StatusForbidden:
|
||||
select {
|
||||
case s.shouldstop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
log.Println("etcdserver: this member has been permanently removed from the cluster")
|
||||
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
||||
log.Println("rafthttp: this member has been permanently removed from the cluster")
|
||||
log.Println("rafthttp: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
||||
return nil
|
||||
case http.StatusNoContent:
|
||||
return nil
|
||||
|
@ -34,9 +34,9 @@ import (
|
||||
func TestSenderSend(t *testing.T) {
|
||||
tr := &roundTripperRecorder{}
|
||||
fs := &stats.FollowerStats{}
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||
|
||||
if err := s.Send(raftpb.Message{}); err != nil {
|
||||
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
||||
t.Fatalf("unexpect send error: %v", err)
|
||||
}
|
||||
s.Stop()
|
||||
@ -54,7 +54,7 @@ func TestSenderSend(t *testing.T) {
|
||||
func TestSenderExceedMaximalServing(t *testing.T) {
|
||||
tr := newRoundTripperBlocker()
|
||||
fs := &stats.FollowerStats{}
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||
|
||||
// keep the sender busy and make the buffer full
|
||||
// nothing can go out as we block the sender
|
||||
@ -86,9 +86,9 @@ func TestSenderExceedMaximalServing(t *testing.T) {
|
||||
// it increases fail count in stats.
|
||||
func TestSenderSendFailed(t *testing.T) {
|
||||
fs := &stats.FollowerStats{}
|
||||
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), &nopProcessor{}, fs, nil)
|
||||
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
|
||||
|
||||
if err := s.Send(raftpb.Message{}); err != nil {
|
||||
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
|
||||
t.Fatalf("unexpect Send error: %v", err)
|
||||
}
|
||||
s.Stop()
|
||||
@ -102,7 +102,7 @@ func TestSenderSendFailed(t *testing.T) {
|
||||
|
||||
func TestSenderPost(t *testing.T) {
|
||||
tr := &roundTripperRecorder{}
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), &nopProcessor{}, nil, nil)
|
||||
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
|
||||
if err := s.post([]byte("some data")); err != nil {
|
||||
t.Fatalf("unexpect post error: %v", err)
|
||||
}
|
||||
@ -145,7 +145,7 @@ func TestSenderPostBad(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
shouldstop := make(chan struct{})
|
||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||
err := s.post([]byte("some data"))
|
||||
s.Stop()
|
||||
|
||||
@ -166,7 +166,7 @@ func TestSenderPostShouldStop(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
shouldstop := make(chan struct{}, 1)
|
||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||
s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop)
|
||||
s.post([]byte("some data"))
|
||||
s.Stop()
|
||||
select {
|
||||
|
@ -42,7 +42,7 @@ function package {
|
||||
cp etcd/README.md ${target}/README.md
|
||||
cp etcd/etcdctl/README.md ${target}/README-etcdctl.md
|
||||
|
||||
cp -R etcd/Documentation/0.5 ${target}/Documentation
|
||||
cp -R etcd/Documentation/2.0 ${target}/Documentation
|
||||
}
|
||||
|
||||
function main {
|
||||
|
2
test
2
test
@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
|
||||
source ./build
|
||||
|
||||
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
|
||||
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
|
||||
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
|
||||
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
|
||||
|
||||
# user has not provided PKG override
|
||||
|
@ -17,5 +17,5 @@
|
||||
package version
|
||||
|
||||
var (
|
||||
Version = "0.5.0-alpha.5"
|
||||
Version = "2.0.0-rc.1"
|
||||
)
|
||||
|
@ -48,7 +48,7 @@ Cut issues 0x10 entries with incremental index later then the file will be calle
|
||||
|
||||
At a later time a WAL can be opened at a particular raft index:
|
||||
|
||||
w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
|
||||
w, err := wal.Open("/var/lib/etcd", 0)
|
||||
...
|
||||
|
||||
The raft index must have been written to the WAL. When opening without a
|
||||
|
85
wal/wal.go
85
wal/wal.go
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
@ -67,6 +68,8 @@ type WAL struct {
|
||||
seq uint64 // sequence of the wal file currently used for writes
|
||||
enti uint64 // index of the last entry saved to the wal
|
||||
encoder *encoder // encoder to encode records
|
||||
|
||||
locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
|
||||
}
|
||||
|
||||
// Create creates a WAL ready for appending records. The given metadata is
|
||||
@ -85,6 +88,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w := &WAL{
|
||||
dir: dirpath,
|
||||
metadata: metadata,
|
||||
@ -92,6 +104,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
f: f,
|
||||
encoder: newEncoder(f, 0),
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
if err := w.saveCrc(0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -104,13 +117,23 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OpenAtIndex opens the WAL at the given index.
|
||||
// Open opens the WAL at the given index.
|
||||
// The index SHOULD have been previously committed to the WAL, or the following
|
||||
// ReadAll will fail.
|
||||
// The returned WAL is ready to read and the first record will be the given
|
||||
// index. The WAL cannot be appended to before reading out all of its
|
||||
// previous records.
|
||||
func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
|
||||
func Open(dirpath string, index uint64) (*WAL, error) {
|
||||
return openAtIndex(dirpath, index, true)
|
||||
}
|
||||
|
||||
// OpenNotInUse only opens the wal files that are not in use.
|
||||
// Other than that, it is similar to Open.
|
||||
func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
|
||||
return openAtIndex(dirpath, index, false)
|
||||
}
|
||||
|
||||
func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
|
||||
names, err := fileutil.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -129,12 +152,27 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
|
||||
|
||||
// open the wal files for reading
|
||||
rcs := make([]io.ReadCloser, 0)
|
||||
ls := make([]fileutil.Lock, 0)
|
||||
for _, name := range names[nameIndex:] {
|
||||
f, err := os.Open(path.Join(dirpath, name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = l.TryLock()
|
||||
if err != nil {
|
||||
if all {
|
||||
return nil, err
|
||||
} else {
|
||||
log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
|
||||
break
|
||||
}
|
||||
}
|
||||
rcs = append(rcs, f)
|
||||
ls = append(ls, l)
|
||||
}
|
||||
rc := MultiReadCloser(rcs...)
|
||||
|
||||
@ -157,8 +195,9 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
|
||||
ri: index,
|
||||
decoder: newDecoder(rc),
|
||||
|
||||
f: f,
|
||||
seq: seq,
|
||||
f: f,
|
||||
seq: seq,
|
||||
locks: ls,
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
@ -224,6 +263,15 @@ func (w *WAL) Cut() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
if err = w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -255,6 +303,30 @@ func (w *WAL) sync() error {
|
||||
return w.f.Sync()
|
||||
}
|
||||
|
||||
// ReleaseLockTo releases the locks w is holding, which
|
||||
// have index smaller or equal to the given index.
|
||||
func (w *WAL) ReleaseLockTo(index uint64) error {
|
||||
for _, l := range w.locks {
|
||||
_, i, err := parseWalName(path.Base(l.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if i > index {
|
||||
return nil
|
||||
}
|
||||
err = l.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Destroy()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = w.locks[1:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) Close() error {
|
||||
if w.f != nil {
|
||||
if err := w.sync(); err != nil {
|
||||
@ -264,6 +336,11 @@ func (w *WAL) Close() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, l := range w.locks {
|
||||
// TODO: log the error
|
||||
l.Unlock()
|
||||
l.Destroy()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
}
|
||||
f.Close()
|
||||
|
||||
w, err := OpenAtIndex(dir, 0)
|
||||
w, err := Open(dir, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
@ -109,7 +109,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
}
|
||||
f.Close()
|
||||
|
||||
w, err = OpenAtIndex(dir, 5)
|
||||
w, err = Open(dir, 5)
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
@ -126,7 +126,7 @@ func TestOpenAtIndex(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(emptydir)
|
||||
if _, err = OpenAtIndex(emptydir, 0); err != ErrFileNotFound {
|
||||
if _, err = Open(emptydir, 0); err != ErrFileNotFound {
|
||||
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
|
||||
}
|
||||
}
|
||||
@ -219,7 +219,7 @@ func TestRecover(t *testing.T) {
|
||||
}
|
||||
w.Close()
|
||||
|
||||
if w, err = OpenAtIndex(p, 0); err != nil {
|
||||
if w, err = Open(p, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
metadata, state, entries, err := w.ReadAll()
|
||||
@ -238,6 +238,7 @@ func TestRecover(t *testing.T) {
|
||||
if !reflect.DeepEqual(state, s) {
|
||||
t.Errorf("state = %+v, want %+v", state, s)
|
||||
}
|
||||
w.Close()
|
||||
}
|
||||
|
||||
func TestSearchIndex(t *testing.T) {
|
||||
@ -341,7 +342,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
w, err := OpenAtIndex(p, uint64(i))
|
||||
w, err := Open(p, uint64(i))
|
||||
if err != nil {
|
||||
if i <= 4 {
|
||||
if err != ErrFileNotFound {
|
||||
@ -365,6 +366,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
||||
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
|
||||
}
|
||||
}
|
||||
w.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@ -384,7 +386,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
|
||||
}
|
||||
w.Close()
|
||||
|
||||
w, err = OpenAtIndex(p, 1)
|
||||
w, err = Open(p, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user