Compare commits
81 Commits
Author | SHA1 | Date | |
---|---|---|---|
1a2c6d3f2f | |||
ecf7c27697 | |||
05ecdbc617 | |||
6648b7e302 | |||
194105e02c | |||
31bfffaa48 | |||
1fbaf9dbb7 | |||
3fd9136740 | |||
a560c52815 | |||
53d20a8a29 | |||
4b72095bd3 | |||
28e150e50e | |||
4d0472029a | |||
e54fdfd9cc | |||
ca390560f9 | |||
cff005777a | |||
d57e07dcde | |||
79bc3f4774 | |||
d2b0dd2419 | |||
b53dc0826e | |||
0ea2173a7e | |||
7ae94f2bf0 | |||
4228c703a7 | |||
10629c40e1 | |||
e2928cd97a | |||
40365c4f8d | |||
88994f9ec8 | |||
d6f8a30f7c | |||
7c65857283 | |||
92dca0af0f | |||
0a5707420b | |||
90b06f874d | |||
66199afb25 | |||
217a1f0730 | |||
def62071f0 | |||
beb44ef6ba | |||
d1ed54b734 | |||
518eb9fa2f | |||
73e67628d9 | |||
04bd06d20b | |||
29f05bb217 | |||
c5ca1218f3 | |||
f7540912d6 | |||
0fcbadc10b | |||
e44dc0f3fe | |||
4d728cc8c4 | |||
f7998bb2db | |||
cfa7ab6074 | |||
b59390c9c3 | |||
fdebf2b109 | |||
e9f4be498d | |||
6d9d7b4497 | |||
163ea3f5c5 | |||
ea1e54b2a1 | |||
b31109cfd7 | |||
7a909c3950 | |||
c16cc3a6a3 | |||
d7840b75c3 | |||
aed2c82e44 | |||
39ee85470f | |||
fbc4c8efb5 | |||
12999ba083 | |||
a0e3bc9cbd | |||
b06e43b803 | |||
8bf795dc3c | |||
02c52f175f | |||
daf1a913bb | |||
317e57a8a8 | |||
5c0d3889f8 | |||
a71184424a | |||
409daceb73 | |||
c6cc276ef0 | |||
cd50f0e058 | |||
fade9b6065 | |||
590205b8c0 | |||
163f0f09f6 | |||
20497f1f85 | |||
4a0887ef7a | |||
161b1d2e2e | |||
71bed48916 | |||
fd90ec6c26 |
@ -6,6 +6,7 @@ go:
|
||||
install:
|
||||
- go get golang.org/x/tools/cmd/cover
|
||||
- go get golang.org/x/tools/cmd/vet
|
||||
- go get github.com/barakmich/go-nyet
|
||||
|
||||
script:
|
||||
- INTEGRATION=y ./test
|
||||
|
88
Documentation/docker_guide.md
Normal file
88
Documentation/docker_guide.md
Normal file
@ -0,0 +1,88 @@
|
||||
# Running etcd under Docker
|
||||
|
||||
The following guide will show you how to run etcd under Docker using the [static bootstrap process](clustering.md#static).
|
||||
|
||||
## Running etcd in standalone mode
|
||||
|
||||
In order to expose the etcd API to clients outside of the Docker host you'll need use the host IP address when configuring etcd.
|
||||
|
||||
```
|
||||
export HostIP="192.168.12.50"
|
||||
```
|
||||
|
||||
The following `docker run` command will expose the etcd client API over ports 4001 and 2379, and expose the peer port over 2380.
|
||||
|
||||
```
|
||||
docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 --name etcd quay.io/coreos/etcd:v2.0.3 \
|
||||
-name etcd0 \
|
||||
-advertise-client-urls http://${HostIP}:2379,http://${HostIP}:4001 \
|
||||
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
|
||||
-initial-advertise-peer-urls http://${HostIP}:2380 \
|
||||
-listen-peer-urls http://0.0.0.0:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster etcd0=http://${HostIP}:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
|
||||
Configure etcd clients to use the Docker host IP and one of the listening ports from above.
|
||||
|
||||
```
|
||||
etcdctl -C http://192.168.12.50:2379 member list
|
||||
```
|
||||
|
||||
```
|
||||
etcdctl -C http://192.168.12.50:4001 member list
|
||||
```
|
||||
|
||||
## Running a 3 node etcd cluster
|
||||
|
||||
Using Docker to setup a multi-node cluster is very similar to the standalone mode configuration.
|
||||
The main difference being the value used for the `-initial-cluster` flag, which must contain the peer urls for each etcd member in the cluster.
|
||||
|
||||
### etcd0
|
||||
|
||||
```
|
||||
docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 --name etcd quay.io/coreos/etcd:v2.0.3 \
|
||||
-name etcd0 \
|
||||
-advertise-client-urls http://192.168.12.50:2379,http://192.168.12.50:4001 \
|
||||
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
|
||||
-initial-advertise-peer-urls http://192.168.12.50:2380 \
|
||||
-listen-peer-urls http://0.0.0.0:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster etcd0=http://192.168.12.50:2380,etcd1=http://192.168.12.51:2380,etcd2=http://192.168.12.52:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
|
||||
### etcd1
|
||||
|
||||
```
|
||||
docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 --name etcd quay.io/coreos/etcd:v2.0.3 \
|
||||
-name etcd1 \
|
||||
-advertise-client-urls http://192.168.12.51:2379,http://192.168.12.51:4001 \
|
||||
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
|
||||
-initial-advertise-peer-urls http://192.168.12.51:2380 \
|
||||
-listen-peer-urls http://0.0.0.0:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster etcd0=http://192.168.12.50:2380,etcd1=http://192.168.12.51:2380,etcd2=http://192.168.12.52:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
|
||||
### etcd2
|
||||
|
||||
```
|
||||
docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 --name etcd quay.io/coreos/etcd:v2.0.3 \
|
||||
-name etcd2 \
|
||||
-advertise-client-urls http://192.168.12.52:2379,http://192.168.12.52:4001 \
|
||||
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:4001 \
|
||||
-initial-advertise-peer-urls http://192.168.12.52:2380 \
|
||||
-listen-peer-urls http://0.0.0.0:2380 \
|
||||
-initial-cluster-token etcd-cluster-1 \
|
||||
-initial-cluster etcd0=http://192.168.12.50:2380,etcd1=http://192.168.12.51:2380,etcd2=http://192.168.12.52:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
|
||||
Once the cluster has been bootstrapped etcd clients can be configured with a list of etcd members:
|
||||
|
||||
```
|
||||
etcdctl -C http://192.168.12.50:2379,http://192.168.12.51:2379,http://192.168.12.52:2379 member list
|
||||
```
|
@ -8,6 +8,7 @@
|
||||
- [etcd-fs](https://github.com/xetorthio/etcd-fs) - FUSE filesystem for etcd
|
||||
- [etcd-browser](https://github.com/henszey/etcd-browser) - A web-based key/value editor for etcd using AngularJS
|
||||
- [etcd-lock](https://github.com/datawisesystems/etcd-lock) - A lock implementation for etcd
|
||||
- [etcd-console](https://github.com/matishsiao/etcd-console) - A web-base key/value editor for etcd using PHP
|
||||
|
||||
**Go libraries**
|
||||
|
||||
|
@ -99,7 +99,7 @@ curl http://10.0.0.10:2379/v2/members/272e204152 -XDELETE
|
||||
|
||||
## Change the peer urls of a member
|
||||
|
||||
Change the peer urls of a given mamber. The member ID must be a hex-encoded uint64. Returns 204 with empty content when successful. Returns a string describing the failure condition when unsuccessful.
|
||||
Change the peer urls of a given member. The member ID must be a hex-encoded uint64. Returns 204 with empty content when successful. Returns a string describing the failure condition when unsuccessful.
|
||||
|
||||
If the POST body is malformed an HTTP 400 will be returned. If the member does not exist in the cluster an HTTP 404 will be returned. If any of the given peerURLs exists in the cluster an HTTP 409 will be returned. If the cluster fails to process the request within timeout an HTTP 500 will be returned, though the request may be processed later.
|
||||
|
||||
|
@ -5,28 +5,28 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
|
||||
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
|
||||
|
||||
### Using an etcd proxy
|
||||
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery-url`).
|
||||
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).
|
||||
|
||||
To start a readwrite proxy, set `-proxy on`; To start a readonly proxy, set `-proxy readonly`.
|
||||
|
||||
The proxy will be listening on `listen-client-urls` and forward requests to the etcd cluster discovered from in `initial-cluster` or `discovery url`.
|
||||
The proxy will be listening on `listen-client-urls` and forward requests to the etcd cluster discovered from in `initial-cluster` or `discovery` url.
|
||||
|
||||
#### Start an etcd proxy with a static configuration
|
||||
To start a proxy that will connect to a statically defined etcd cluster, specify the `initial-cluster` flag:
|
||||
```
|
||||
etcd -proxy on -client-listen-urls 127.0.0.1:8080 -initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380
|
||||
etcd -proxy on -listen-client-urls 127.0.0.1:8080 -initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380
|
||||
```
|
||||
|
||||
#### Start an etcd proxy with the discovery service
|
||||
If you bootstrap an etcd cluster using the [discovery service][discovery-service], you can also start the proxy with the same `discovery-url`.
|
||||
If you bootstrap an etcd cluster using the [discovery service][discovery-service], you can also start the proxy with the same `discovery`.
|
||||
|
||||
To start a proxy using the discovery service, specify the `discovery-url` flag. The proxy will wait until the etcd cluster defined at the `discovery-url` finishes bootstrapping, and then start to forward the requests.
|
||||
To start a proxy using the discovery service, specify the `discovery` flag. The proxy will wait until the etcd cluster defined at the `discovery` url finishes bootstrapping, and then start to forward the requests.
|
||||
|
||||
```
|
||||
etcd -proxy on -client-listen-urls 127.0.0.1:8080 -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
etcd -proxy on -listen-client-urls 127.0.0.1:8080 -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de
|
||||
```
|
||||
|
||||
#### Fallback to proxy mode with discovery service
|
||||
If you bootstrap a etcd cluster using [discovery service][discovery-service] with more than the expected number of etcd members, the extra etcd processes will fall back to being `readwrite` proxies by default. They will forward the requests to the cluster as described above. For example, if you create a discovery url with `size=5`, and start ten etcd processes using that same discovery URL, the result will be a cluster with five etcd members and five proxies. Note that this behaviour can be disabled with the `proxy-fallback` flag.
|
||||
If you bootstrap a etcd cluster using [discovery service][discovery-service] with more than the expected number of etcd members, the extra etcd processes will fall back to being `readwrite` proxies by default. They will forward the requests to the cluster as described above. For example, if you create a discovery url with `size=5`, and start ten etcd processes using that same discovery url, the result will be a cluster with five etcd members and five proxies. Note that this behaviour can be disabled with the `proxy-fallback` flag.
|
||||
|
||||
[discovery-service]: https://github.com/coreos/etcd/blob/master/Documentation/clustering.md#discovery
|
||||
|
470
Documentation/rfc/api_security.md
Normal file
470
Documentation/rfc/api_security.md
Normal file
@ -0,0 +1,470 @@
|
||||
# v2 Auth and Security
|
||||
|
||||
## etcd Resources
|
||||
There are three types of resources in etcd
|
||||
|
||||
1. user resources: users and roles in the user store
|
||||
2. key-value resources: key-value pairs in the key-value store
|
||||
3. settings resources: security settings, auth settings, and dynamic etcd cluster settings (election/heartbeat)
|
||||
|
||||
### User Resources
|
||||
|
||||
#### Users
|
||||
A user is an identity to be authenticated. Each user can have multiple roles. The user has a capability on the resource if one of the roles has that capability.
|
||||
|
||||
The special static `root` user has a ROOT role. (Caps for visual aid throughout)
|
||||
|
||||
#### Role
|
||||
Each role has exact one associated Permission List. An permission list exists for each permission on key-value resources. A role with `manage` permission of a key-value resource can grant/revoke capability of that key-value to other roles.
|
||||
|
||||
The special static ROOT role has a full permissions on all key-value resources, the permission to manage user resources and settings resources. Only the ROOT role has the permission to manage user resources and modify settings resources.
|
||||
|
||||
#### Permissions
|
||||
|
||||
There are two types of permissions, `read` and `write`. All management stems from the ROOT user.
|
||||
|
||||
A Permission List is a list of allowed patterns for that particular permission (read or write). Only ALLOW prefixes (incidentally, this is what Amazon S3 does). DENY becomes more complicated and is TBD.
|
||||
|
||||
### Key-Value Resources
|
||||
A key-value resource is a key-value pairs in the store. Given a list of matching patterns, permission for any given key in a request is granted if any of the patterns in the list match.
|
||||
|
||||
The glob match rules are as follows:
|
||||
|
||||
* `*` and `\` are special characters, representing "greedy match" and "escape" respectively.
|
||||
* As a corrolary, `\*` and `\\` are the corresponding literal matches.
|
||||
* All other bytes match exactly their bytes, starting always from the *first byte*. (For regex fans, `re.match` in Python)
|
||||
* Examples:
|
||||
* `/foo` matches only the single key/directory of `/foo`
|
||||
* `/foo*` matches the prefix `/foo`, and all subdirectories/keys
|
||||
* `/foo/*/bar` matches the keys bar in any (recursive) subdirectory of `/foo`.
|
||||
|
||||
### Settings Resources
|
||||
|
||||
Specific settings for the cluster as a whole. This can include adding and removing cluster members, enabling or disabling security, replacing certificates, and any other dynamic configuration by the administrator.
|
||||
|
||||
## v2 Auth
|
||||
|
||||
### Basic Auth
|
||||
We only support [Basic Auth](http://en.wikipedia.org/wiki/Basic_access_authentication) for the first version. Client needs to attach the basic auth to the HTTP Authorization Header.
|
||||
|
||||
### Authorization field for operations
|
||||
Added to requests to /v2/keys, /v2/security
|
||||
Add code 403 Forbidden to the set of responses from the v2 API
|
||||
Authorization: Basic {encoded string}
|
||||
|
||||
### Future Work
|
||||
Other types of auth can be considered for the future (eg, signed certs, public keys) but the `Authorization:` header allows for other such types
|
||||
|
||||
### Things out of Scope for etcd Permissions
|
||||
|
||||
* Pluggable AUTH backends like LDAP (other Authorization tokens generated by LDAP et al may be a possiblity)
|
||||
* Very fine-grained access controls (eg: users modifying keys outside work hours)
|
||||
|
||||
|
||||
|
||||
## API endpoints
|
||||
|
||||
An Error JSON corresponds to:
|
||||
{
|
||||
"name": "ErrErrorName",
|
||||
"description" : "The longer helpful description of the error."
|
||||
}
|
||||
|
||||
#### Users
|
||||
|
||||
The User JSON object is formed as follows:
|
||||
|
||||
```
|
||||
{
|
||||
"user": "userName"
|
||||
"password": "password"
|
||||
"roles": [
|
||||
"role1",
|
||||
"role2"
|
||||
],
|
||||
"grant": [],
|
||||
"revoke": [],
|
||||
"lastModified": "2006-01-02Z04:05:07"
|
||||
}
|
||||
```
|
||||
|
||||
Password is only passed when necessary. Last Modified is set by the server and ignored in all client posts.
|
||||
|
||||
**Get a list of users**
|
||||
|
||||
GET/HEAD /v2/security/user
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
200 Headers:
|
||||
ETag: "<hash of list of users>"
|
||||
Content-type: application/json
|
||||
200 Body:
|
||||
{
|
||||
"users": ["alice", "bob", "eve"]
|
||||
}
|
||||
|
||||
**Get User Details**
|
||||
|
||||
GET/HEAD /v2/security/users/alice
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
ETag: "users/alice:<lastModified>"
|
||||
Content-type: application/json
|
||||
200 Body:
|
||||
{
|
||||
"user" : "alice"
|
||||
"roles" : ["fleet", "etcd"]
|
||||
"lastModified": "2015-02-05Z18:00:00"
|
||||
}
|
||||
|
||||
**Create A User**
|
||||
|
||||
A user can be created with initial roles, if filled in. However, no roles are required; only the username and password fields
|
||||
|
||||
PUT /v2/security/users/charlie
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
JSON struct, above, matching the appropriate name and with starting roles.
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
409 Conflict (if exists)
|
||||
200 Headers:
|
||||
ETag: "users/charlie:<tzNow>"
|
||||
200 Body: (empty)
|
||||
|
||||
**Remove A User**
|
||||
|
||||
DELETE /v2/security/users/charlie
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
200 Body: (empty)
|
||||
|
||||
**Grant a Role(s) to a User**
|
||||
|
||||
PUT /v2/security/users/charlie/grant
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
{ "grantRoles" : ["fleet", "etcd"], (extra JSON data for checking OK) }
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
409 Conflict
|
||||
200 Headers:
|
||||
ETag: "users/charlie:<tzNow>"
|
||||
200 Body:
|
||||
JSON user struct, updated. "roles" now contains the grants, and "grantRoles" is empty. If there is an error in the set of roles to be added, for example, a non-existent role, then 409 is returned, with an error JSON stating why.
|
||||
|
||||
**Revoke a Role(s) from a User**
|
||||
|
||||
PUT /v2/security/users/charlie/revoke
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
{ "revokeRoles" : ["fleet"], (extra JSON data for checking OK) }
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
409 Conflict
|
||||
200 Headers:
|
||||
ETag: "users/charlie:<tzNow>"
|
||||
200 Body:
|
||||
JSON user struct, updated. "roles" now doesn't contain the roles, and "revokeRoles" is empty. If there is an error in the set of roles to be removed, for example, a non-existent role, then 409 is returned, with an error JSON stating why.
|
||||
|
||||
**Change password**
|
||||
|
||||
PUT /v2/security/users/charlie/password
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
{"user": "charlie", "password": "newCharliePassword"}
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
ETag: "users/charlie:<tzNow>"
|
||||
200 Body:
|
||||
JSON user struct, updated
|
||||
|
||||
#### Roles
|
||||
|
||||
A full role structure may look like this. A Permission List structure is used for the "permissions", "grant", and "revoke" keys.
|
||||
```
|
||||
{
|
||||
"role" : "fleet",
|
||||
"permissions" : {
|
||||
"kv" {
|
||||
"read" : [ "/fleet/" ],
|
||||
"write": [ "/fleet/" ],
|
||||
}
|
||||
}
|
||||
"grant" : {"kv": {...}},
|
||||
"revoke": {"kv": {...}},
|
||||
"members" : ["alice", "bob"],
|
||||
"lastModified": "2015-02-05Z18:00:00"
|
||||
}
|
||||
```
|
||||
|
||||
**Get a list of Roles**
|
||||
|
||||
GET/HEAD /v2/security/roles
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
200 Headers:
|
||||
ETag: "<hash of list of roles>"
|
||||
Content-type: application/json
|
||||
200 Body:
|
||||
{
|
||||
"roles": ["fleet", "etcd", "quay"]
|
||||
}
|
||||
|
||||
**Get Role Details**
|
||||
|
||||
GET/HEAD /v2/security/roles/fleet
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
ETag: "roles/fleet:<lastModified>"
|
||||
Content-type: application/json
|
||||
200 Body:
|
||||
{
|
||||
"role" : "fleet",
|
||||
"read": {
|
||||
"prefixesAllowed": ["/fleet/"],
|
||||
},
|
||||
"write": {
|
||||
"prefixesAllowed": ["/fleet/"],
|
||||
},
|
||||
"members" : ["alice", "bob"] // Reverse map optional?
|
||||
"lastModified": "2015-02-05Z18:00:00"
|
||||
}
|
||||
|
||||
**Create A Role**
|
||||
|
||||
PUT /v2/security/roles/rocket
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
Initial desired JSON state, complete with prefixes and
|
||||
Possible Status Codes:
|
||||
201 Created
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
409 Conflict (if exists)
|
||||
200 Headers:
|
||||
ETag: "roles/rocket:<tzNow>"
|
||||
200 Body:
|
||||
JSON state of the role
|
||||
|
||||
**Remove A Role**
|
||||
|
||||
DELETE /v2/security/roles/rocket
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
200 Body: (empty)
|
||||
|
||||
**Update a Role’s Permission List for {read,write}ing**
|
||||
|
||||
PUT /v2/security/roles/rocket/update
|
||||
|
||||
Sent Headers:
|
||||
Authorization: Basic <BasicAuthString>
|
||||
Put Body:
|
||||
{
|
||||
"role" : "rocket",
|
||||
"grant": {
|
||||
"kv": {
|
||||
"read" : [ "/rocket/"]
|
||||
}
|
||||
},
|
||||
"revoke": {
|
||||
"kv": {
|
||||
"read" : [ "/fleet/"]
|
||||
}
|
||||
}
|
||||
}
|
||||
Possible Status Codes:
|
||||
200 OK
|
||||
403 Forbidden
|
||||
404 Not Found
|
||||
200 Headers:
|
||||
ETag: "roles/rocket:<tzNow>"
|
||||
200 Body:
|
||||
JSON state of the role, with change containing empty lists and the deltas applied appropriately.
|
||||
|
||||
|
||||
#### TBD Management modification
|
||||
|
||||
|
||||
## Example Workflow
|
||||
|
||||
Let's walk through an example to show two tenants (applications, in our case) using etcd permissions.
|
||||
|
||||
### Enable security
|
||||
|
||||
//TODO(barakmich): Maybe this is dynamic? I don't like the idea of rebooting when we don't have to.
|
||||
|
||||
#### Default ROOT
|
||||
|
||||
etcd always has a ROOT when started with security enabled. The default username is `root`, and the password is `root`.
|
||||
|
||||
// TODO(barakmich): if the enabling is dynamic, perhaps that'd be a good time to set a password? Thus obviating the next section.
|
||||
|
||||
|
||||
### Change root's password
|
||||
|
||||
```
|
||||
PUT /v2/security/users/root/password
|
||||
Headers:
|
||||
Authorization: Basic <root:root>
|
||||
Put Body:
|
||||
{"user" : "root", "password": "betterRootPW!"}
|
||||
```
|
||||
|
||||
//TODO(barakmich): How do you recover the root password? *This* may require a flag and a restart. `--disable-permissions`
|
||||
|
||||
### Create Roles for the Applications
|
||||
|
||||
Create the rocket role fully specified:
|
||||
|
||||
```
|
||||
PUT /v2/security/roles/rocket
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Body:
|
||||
{
|
||||
"role" : "rocket",
|
||||
"permissions" : {
|
||||
"kv": {
|
||||
"read": [
|
||||
"/rocket/"
|
||||
],
|
||||
"write": [
|
||||
"/rocket/"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
But let's make fleet just a basic role for now:
|
||||
|
||||
```
|
||||
PUT /v2/security/roles/fleet
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Body:
|
||||
{
|
||||
"role" : "fleet",
|
||||
}
|
||||
```
|
||||
|
||||
### Optional: Add some permissions to the roles
|
||||
|
||||
Well, we finally figured out where we want fleet to live. Let's fix it.
|
||||
(Note that we avoided this in the rocket case. So this step is optional.)
|
||||
|
||||
|
||||
```
|
||||
PUT /v2/security/roles/fleet/update
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Put Body:
|
||||
{
|
||||
"role" : "fleet",
|
||||
"grant" : {
|
||||
"kv" : {
|
||||
"read": [
|
||||
"/fleet/"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Create Users
|
||||
|
||||
Same as before, let's use rocket all at once and fleet separately
|
||||
|
||||
```
|
||||
PUT /v2/security/users/rocketuser
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Body:
|
||||
{"user" : "rocketuser", "password" : "rocketpw", "roles" : ["rocket"]}
|
||||
```
|
||||
|
||||
```
|
||||
PUT /v2/security/users/fleetuser
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Body:
|
||||
{"user" : "fleetuser", "password" : "fleetpw"}
|
||||
```
|
||||
|
||||
### Optional: Grant Roles to Users
|
||||
|
||||
Likewise, let's explicitly grant fleetuser access.
|
||||
|
||||
```
|
||||
PUT /v2/security/users/fleetuser/grant
|
||||
Headers:
|
||||
Authorization: Basic <root:betterRootPW!>
|
||||
Body:
|
||||
{"user": "fleetuser", "grant": ["fleet"]}
|
||||
```
|
||||
|
||||
#### Start to use fleetuser and rocketuser
|
||||
|
||||
|
||||
For example:
|
||||
|
||||
```
|
||||
PUT /v2/keys/rocket/RocketData
|
||||
Headers:
|
||||
Authorization: Basic <rocketuser:rocketpw>
|
||||
```
|
||||
|
||||
Reads and writes outside the prefixes granted will fail with a 403 Forbidden.
|
||||
|
4
build
4
build
@ -12,7 +12,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
||||
eval $(go env)
|
||||
|
||||
# Static compilation is useful when etcd is run in a container
|
||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
|
||||
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
|
||||
|
@ -89,7 +89,7 @@ func TestV2KeysURLHelper(t *testing.T) {
|
||||
|
||||
func TestGetAction(t *testing.T) {
|
||||
ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
|
||||
wantURL := &url.URL{
|
||||
baseWantURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "example.com",
|
||||
Path: "/v2/keys/foo/bar",
|
||||
@ -117,7 +117,7 @@ func TestGetAction(t *testing.T) {
|
||||
}
|
||||
got := *f.HTTPRequest(ep)
|
||||
|
||||
wantURL := wantURL
|
||||
wantURL := baseWantURL
|
||||
wantURL.RawQuery = tt.wantQuery
|
||||
|
||||
err := assertResponse(got, wantURL, wantHeader, nil)
|
||||
@ -129,7 +129,7 @@ func TestGetAction(t *testing.T) {
|
||||
|
||||
func TestWaitAction(t *testing.T) {
|
||||
ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
|
||||
wantURL := &url.URL{
|
||||
baseWantURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "example.com",
|
||||
Path: "/v2/keys/foo/bar",
|
||||
@ -166,7 +166,7 @@ func TestWaitAction(t *testing.T) {
|
||||
}
|
||||
got := *f.HTTPRequest(ep)
|
||||
|
||||
wantURL := wantURL
|
||||
wantURL := baseWantURL
|
||||
wantURL.RawQuery = tt.wantQuery
|
||||
|
||||
err := assertResponse(got, wantURL, wantHeader, nil)
|
||||
|
@ -193,14 +193,14 @@ func TestCheckCluster(t *testing.T) {
|
||||
})
|
||||
}
|
||||
c := &clientWithResp{rs: rs}
|
||||
d := discovery{cluster: cluster, id: 1, c: c}
|
||||
dBase := discovery{cluster: cluster, id: 1, c: c}
|
||||
|
||||
cRetry := &clientWithRetry{failTimes: 3}
|
||||
cRetry.rs = rs
|
||||
fc := clockwork.NewFakeClock()
|
||||
dRetry := discovery{cluster: cluster, id: 1, c: cRetry, clock: fc}
|
||||
|
||||
for _, d := range []discovery{d, dRetry} {
|
||||
for _, d := range []discovery{dBase, dRetry} {
|
||||
go func() {
|
||||
for i := uint(1); i <= maxRetryInTest; i++ {
|
||||
fc.BlockUntil(1)
|
||||
@ -263,7 +263,7 @@ func TestWaitNodes(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
// Basic case
|
||||
c := &clientWithResp{nil, &watcherWithResp{tt.rs}}
|
||||
d := &discovery{cluster: "1000", c: c}
|
||||
dBase := &discovery{cluster: "1000", c: c}
|
||||
|
||||
// Retry case
|
||||
retryScanResp := make([]*client.Response, 0)
|
||||
@ -291,7 +291,7 @@ func TestWaitNodes(t *testing.T) {
|
||||
clock: fc,
|
||||
}
|
||||
|
||||
for _, d := range []*discovery{d, dRetry} {
|
||||
for _, d := range []*discovery{dBase, dRetry} {
|
||||
go func() {
|
||||
for i := uint(1); i <= maxRetryInTest; i++ {
|
||||
fc.BlockUntil(1)
|
||||
|
@ -54,6 +54,7 @@ func handleClusterHealth(c *cli.Context) {
|
||||
|
||||
// is raft stable and making progress?
|
||||
client = etcd.NewClient([]string{ep})
|
||||
client.SetTransport(tr)
|
||||
resp, err := client.Get("/", false, false)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
|
@ -134,10 +134,10 @@ func actionMemberAdd(c *cli.Context) {
|
||||
}
|
||||
|
||||
conf := []string{}
|
||||
for _, m := range members {
|
||||
for _, u := range m.PeerURLs {
|
||||
n := m.Name
|
||||
if m.ID == newID {
|
||||
for _, memb := range members {
|
||||
for _, u := range memb.PeerURLs {
|
||||
n := memb.Name
|
||||
if memb.ID == newID {
|
||||
n = newName
|
||||
}
|
||||
conf = append(conf, fmt.Sprintf("%s=%s", n, u))
|
||||
@ -160,8 +160,9 @@ func actionMemberRemove(c *cli.Context) {
|
||||
|
||||
mAPI := mustNewMembersAPI(c)
|
||||
// Get the list of members.
|
||||
listctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
listctx, listCancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
members, err := mAPI.List(listctx)
|
||||
listCancel()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error while verifying ID against known members:", err.Error())
|
||||
os.Exit(1)
|
||||
@ -184,9 +185,9 @@ func actionMemberRemove(c *cli.Context) {
|
||||
}
|
||||
|
||||
// Actually attempt to remove the member.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
ctx, removeCancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
err = mAPI.Remove(ctx, removalID)
|
||||
cancel()
|
||||
removeCancel()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Recieved an error trying to remove member %s: %s", removalID, err.Error())
|
||||
os.Exit(1)
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
@ -74,7 +74,10 @@ func Main() {
|
||||
}
|
||||
}
|
||||
|
||||
osutil.HandleInterrupts()
|
||||
|
||||
<-stopped
|
||||
osutil.Exit(0)
|
||||
}
|
||||
|
||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
@ -88,13 +91,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
if err := makeMemberDir(cfg.dir); err != nil {
|
||||
return nil, fmt.Errorf("cannot use /member sub-directory: %v", err)
|
||||
}
|
||||
membdir := path.Join(cfg.dir, "member")
|
||||
if err := fileutil.IsDirWriteable(membdir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
@ -149,7 +145,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Name: cfg.name,
|
||||
ClientURLs: cfg.acurls,
|
||||
PeerURLs: cfg.apurls,
|
||||
DataDir: membdir,
|
||||
DataDir: cfg.dir,
|
||||
SnapCount: cfg.snapCount,
|
||||
MaxSnapFiles: cfg.maxSnapFiles,
|
||||
MaxWALFiles: cfg.maxWalFiles,
|
||||
@ -168,6 +164,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
s.Start()
|
||||
osutil.RegisterInterruptHandler(s.Stop)
|
||||
|
||||
if cfg.corsInfo.String() != "" {
|
||||
log.Printf("etcd: cors = %s", cfg.corsInfo)
|
||||
@ -176,7 +173,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Handler: etcdhttp.NewClientHandler(s),
|
||||
Info: cfg.corsInfo,
|
||||
}
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
|
||||
// Start the peer server in a goroutine
|
||||
for _, l := range plns {
|
||||
go func(l net.Listener) {
|
||||
@ -252,7 +249,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
uf := func() []string {
|
||||
gcls, err := etcdserver.GetClusterFromPeers(peerURLs, tr)
|
||||
gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
|
||||
// TODO: remove the 2nd check when we fix GetClusterFromPeers
|
||||
// GetClusterFromPeers should not return nil error with an invaild empty cluster
|
||||
if err != nil {
|
||||
@ -336,42 +333,6 @@ func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
||||
return cls, err
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
v1Files := types.NewUnsafeSet("conf", "log", "snapshot")
|
||||
v2Files := types.NewUnsafeSet("wal", "snap")
|
||||
names, err := fileutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, name := range names {
|
||||
switch {
|
||||
case v1Files.Contains(name):
|
||||
// Link it to the subdir and keep the v1 file at the original
|
||||
// location, so v0.4 etcd can still bootstrap if the upgrade
|
||||
// failed.
|
||||
if err := os.Symlink(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
case v2Files.Contains(name):
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func genClusterString(name string, urls types.URLs) string {
|
||||
addrs := make([]string, 0)
|
||||
for _, u := range urls {
|
||||
|
@ -56,14 +56,21 @@ type ClusterInfo interface {
|
||||
|
||||
// Cluster is a list of Members that belong to the same raft cluster
|
||||
type Cluster struct {
|
||||
id types.ID
|
||||
token string
|
||||
members map[types.ID]*Member
|
||||
id types.ID
|
||||
token string
|
||||
store store.Store
|
||||
// index is the raft index that cluster is updated at bootstrap
|
||||
// from remote cluster info.
|
||||
// It may have a higher value than local raft index, because it
|
||||
// displays a further view of the cluster.
|
||||
// TODO: upgrade it as last modified index
|
||||
index uint64
|
||||
|
||||
sync.Mutex // guards members and removed map
|
||||
members map[types.ID]*Member
|
||||
// removed contains the ids of removed members in the cluster.
|
||||
// removed id cannot be reused.
|
||||
removed map[types.ID]bool
|
||||
store store.Store
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// NewClusterFromString returns a Cluster instantiated from the given cluster token
|
||||
@ -229,6 +236,8 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
||||
|
||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||
|
||||
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
|
||||
|
||||
func (c *Cluster) Recover() {
|
||||
c.members, c.removed = membersFromStore(c.store)
|
||||
}
|
||||
@ -346,6 +355,20 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
||||
c.members[id].RaftAttributes = raftAttr
|
||||
}
|
||||
|
||||
// Validate ensures that there is no identical urls in the cluster peer list
|
||||
func (c *Cluster) Validate() error {
|
||||
urlMap := make(map[string]bool)
|
||||
for _, m := range c.Members() {
|
||||
for _, url := range m.PeerURLs {
|
||||
if urlMap[url] {
|
||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
||||
}
|
||||
urlMap[url] = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
members := make(map[types.ID]*Member)
|
||||
removed := make(map[types.ID]bool)
|
||||
|
123
etcdserver/cluster_util.go
Normal file
123
etcdserver/cluster_util.go
Normal file
@ -0,0 +1,123 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
// isMemberBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
|
||||
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), false, tr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
id := cl.MemberByName(member).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return false
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetClusterFromRemotePeers takes a set of URLs representing etcd peers, and
|
||||
// attempts to construct a Cluster by accessing the members endpoint on one of
|
||||
// these URLs. The first URL to provide a response is used. If no URLs provide
|
||||
// a response, or a Cluster cannot be successfully created from a received
|
||||
// response, an error is returned.
|
||||
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*Cluster, error) {
|
||||
return getClusterFromRemotePeers(urls, true, tr)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Second,
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := cc.Get(u + "/members")
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var membs []*Member
|
||||
if err := json.Unmarshal(b, &membs); err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var index uint64
|
||||
// The header at or before v2.0.3 doesn't have this field. For backward
|
||||
// compatibility, it checks whether the field exists.
|
||||
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
|
||||
index, err = strconv.ParseUint(indexStr, 10, 64)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse raft index: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
cl := NewClusterFromMembers("", id, membs)
|
||||
cl.UpdateIndex(index)
|
||||
return cl, nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
|
||||
// returned list is sorted in ascending lexicographical order.
|
||||
func getRemotePeerURLs(cl ClusterInfo, local string) []string {
|
||||
us := make([]string, 0)
|
||||
for _, m := range cl.Members() {
|
||||
if m.Name == local {
|
||||
continue
|
||||
}
|
||||
us = append(us, m.PeerURLs...)
|
||||
}
|
||||
sort.Strings(us)
|
||||
return us
|
||||
}
|
@ -62,15 +62,8 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
|
||||
}
|
||||
|
||||
// No identical IPs in the cluster peer list
|
||||
urlMap := make(map[string]bool)
|
||||
for _, m := range c.Cluster.Members() {
|
||||
for _, url := range m.PeerURLs {
|
||||
if urlMap[url] {
|
||||
return fmt.Errorf("duplicate url %v in cluster config", url)
|
||||
}
|
||||
urlMap[url] = true
|
||||
}
|
||||
if err := c.Cluster.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Advertised peer URLs must match those in the cluster peer list
|
||||
@ -83,9 +76,11 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
|
||||
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.MemberDir(), "wal") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
|
||||
@ -99,6 +94,7 @@ func (c *ServerConfig) print(initial bool) {
|
||||
log.Println("etcdserver: force new cluster")
|
||||
}
|
||||
log.Printf("etcdserver: data dir = %s", c.DataDir)
|
||||
log.Printf("etcdserver: member dir = %s", c.MemberDir())
|
||||
log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
|
||||
log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
|
||||
log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
|
||||
|
@ -129,8 +129,8 @@ func TestBootstrapConfigVerify(t *testing.T) {
|
||||
|
||||
func TestSnapDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/snap",
|
||||
"/": "/member/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/member/snap",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
@ -144,8 +144,8 @@ func TestSnapDir(t *testing.T) {
|
||||
|
||||
func TestWALDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/wal",
|
||||
"/": "/member/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/member/wal",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
|
@ -1064,13 +1064,13 @@ func TestServeMembersFail(t *testing.T) {
|
||||
|
||||
func TestWriteEvent(t *testing.T) {
|
||||
// nil event should not panic
|
||||
rw := httptest.NewRecorder()
|
||||
writeKeyEvent(rw, nil, dummyRaftTimer{})
|
||||
h := rw.Header()
|
||||
rec := httptest.NewRecorder()
|
||||
writeKeyEvent(rec, nil, dummyRaftTimer{})
|
||||
h := rec.Header()
|
||||
if len(h) > 0 {
|
||||
t.Fatalf("unexpected non-empty headers: %#v", h)
|
||||
}
|
||||
b := rw.Body.String()
|
||||
b := rec.Body.String()
|
||||
if len(b) > 0 {
|
||||
t.Fatalf("unexpected non-empty body: %q", b)
|
||||
}
|
||||
|
@ -76,13 +76,13 @@ func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) erro
|
||||
|
||||
func TestWriteError(t *testing.T) {
|
||||
// nil error should not panic
|
||||
rw := httptest.NewRecorder()
|
||||
writeError(rw, nil)
|
||||
h := rw.Header()
|
||||
rec := httptest.NewRecorder()
|
||||
writeError(rec, nil)
|
||||
h := rec.Header()
|
||||
if len(h) > 0 {
|
||||
t.Fatalf("unexpected non-empty headers: %#v", h)
|
||||
}
|
||||
b := rw.Body.String()
|
||||
b := rec.Body.String()
|
||||
if len(b) > 0 {
|
||||
t.Fatalf("unexpected non-empty body: %q", b)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
@ -28,9 +29,10 @@ const (
|
||||
)
|
||||
|
||||
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
|
||||
mh := &peerMembersHandler{
|
||||
clusterInfo: clusterInfo,
|
||||
timer: timer,
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@ -43,6 +45,7 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler
|
||||
|
||||
type peerMembersHandler struct {
|
||||
clusterInfo etcdserver.ClusterInfo
|
||||
timer etcdserver.RaftTimer
|
||||
}
|
||||
|
||||
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@ -50,6 +53,7 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
|
||||
|
||||
if r.URL.Path != peerMembersPrefix {
|
||||
http.Error(w, "bad path", http.StatusBadRequest)
|
||||
|
@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("test data"))
|
||||
})
|
||||
ph := NewPeerHandler(&fakeCluster{}, h)
|
||||
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
|
||||
srv := httptest.NewServer(ph)
|
||||
defer srv.Close()
|
||||
|
||||
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
|
||||
id: 1,
|
||||
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
||||
}
|
||||
h := &peerMembersHandler{clusterInfo: cluster}
|
||||
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
|
||||
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -18,13 +18,11 @@ import (
|
||||
"encoding/json"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -146,6 +144,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
var n raft.Node
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
|
||||
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -154,18 +153,18 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
|
||||
}
|
||||
haveWAL := walVersion != wal.WALNotExist
|
||||
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
us := getOtherPeerURLs(cfg.Cluster, cfg.Name)
|
||||
existingCluster, err := GetClusterFromPeers(us, cfg.Transport)
|
||||
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
|
||||
}
|
||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
cfg.Cluster.UpdateIndex(existingCluster.index)
|
||||
cfg.Cluster.SetID(existingCluster.id)
|
||||
cfg.Cluster.SetStore(st)
|
||||
cfg.Print()
|
||||
@ -175,26 +174,36 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, err
|
||||
}
|
||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
||||
if isBootstrapped(cfg) {
|
||||
if isMemberBootstrapped(cfg.Cluster, cfg.Name, cfg.Transport) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
s, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
|
||||
str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
|
||||
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, str); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := cfg.Cluster.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("bad discovery cluster: %v", err)
|
||||
}
|
||||
}
|
||||
cfg.Cluster.SetStore(st)
|
||||
cfg.PrintWithInitial()
|
||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
if walVersion != wal.WALv0_5 {
|
||||
if err := upgradeWAL(cfg, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Run the migrations.
|
||||
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
|
||||
if cfg.ShouldDiscover() {
|
||||
@ -385,7 +394,21 @@ func (s *EtcdServer) run() {
|
||||
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
||||
log.Panicf("recovery store error: %v", err)
|
||||
}
|
||||
s.Cluster.Recover()
|
||||
|
||||
// It avoids snapshot recovery overwriting newer cluster and
|
||||
// transport setting, which may block the communication.
|
||||
if s.Cluster.index < rd.Snapshot.Metadata.Index {
|
||||
s.Cluster.Recover()
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
|
||||
appliedi = rd.Snapshot.Metadata.Index
|
||||
confState = rd.Snapshot.Metadata.ConfState
|
||||
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
|
||||
@ -821,88 +844,3 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
|
||||
func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
|
||||
|
||||
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
|
||||
|
||||
// isBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isBootstrapped(cfg *ServerConfig) bool {
|
||||
cl := cfg.Cluster
|
||||
member := cfg.Name
|
||||
|
||||
us := getOtherPeerURLs(cl, member)
|
||||
rcl, err := getClusterFromPeers(us, false, cfg.Transport)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
id := cl.MemberByName(member).ID
|
||||
m := rcl.Member(id)
|
||||
if m == nil {
|
||||
return false
|
||||
}
|
||||
if len(m.ClientURLs) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetClusterFromPeers takes a set of URLs representing etcd peers, and
|
||||
// attempts to construct a Cluster by accessing the members endpoint on one of
|
||||
// these URLs. The first URL to provide a response is used. If no URLs provide
|
||||
// a response, or a Cluster cannot be successfully created from a received
|
||||
// response, an error is returned.
|
||||
func GetClusterFromPeers(urls []string, tr *http.Transport) (*Cluster, error) {
|
||||
return getClusterFromPeers(urls, true, tr)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromPeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Second,
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := cc.Get(u + "/members")
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not get cluster response from %s: %v", u, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not read the body of cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
var membs []*Member
|
||||
if err := json.Unmarshal(b, &membs); err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not unmarshal cluster response: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse the cluster ID from cluster res: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
// getOtherPeerURLs returns peer urls of other members in the cluster. The
|
||||
// returned list is sorted in ascending lexicographical order.
|
||||
func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
||||
us := make([]string, 0)
|
||||
for _, m := range cl.Members() {
|
||||
if m.Name == self {
|
||||
continue
|
||||
}
|
||||
us = append(us, m.PeerURLs...)
|
||||
}
|
||||
sort.Strings(us)
|
||||
return us
|
||||
}
|
||||
|
@ -1027,8 +1027,8 @@ func TestPublish(t *testing.T) {
|
||||
t.Errorf("method = %s, want PUT", r.Method)
|
||||
}
|
||||
wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
|
||||
if w := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != w {
|
||||
t.Errorf("path = %s, want %s", r.Path, w)
|
||||
if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath {
|
||||
t.Errorf("path = %s, want %s", r.Path, wpath)
|
||||
}
|
||||
var gattr Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
|
||||
@ -1072,8 +1072,8 @@ func TestPublishRetry(t *testing.T) {
|
||||
|
||||
action := n.Action()
|
||||
// multiple Proposes
|
||||
if n := len(action); n < 2 {
|
||||
t.Errorf("len(action) = %d, want >= 2", n)
|
||||
if cnt := len(action); cnt < 2 {
|
||||
t.Errorf("len(action) = %d, want >= 2", cnt)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1135,7 +1135,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
cl := NewClusterFromMembers("", types.ID(0), tt.membs)
|
||||
urls := getOtherPeerURLs(cl, tt.self)
|
||||
urls := getRemotePeerURLs(cl, tt.self)
|
||||
if !reflect.DeepEqual(urls, tt.wurls) {
|
||||
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
|
||||
}
|
||||
@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
|
@ -16,6 +16,8 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
@ -91,14 +93,47 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
if ver == wal.WALv0_4 {
|
||||
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
|
||||
switch ver {
|
||||
case wal.WALv0_4:
|
||||
log.Print("etcdserver: converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
|
||||
err := migrate.Migrate4To2(baseDataDir, name)
|
||||
if err != nil {
|
||||
log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0:
|
||||
err := makeMemberDir(baseDataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0_1:
|
||||
fallthrough
|
||||
default:
|
||||
log.Printf("datadir is valid for the 2.0.1 format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
names := []string{"snap", "wal"}
|
||||
for _, name := range names {
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -186,13 +186,13 @@ func clusterMustProgress(t *testing.T, membs []*member) {
|
||||
|
||||
for i, m := range membs {
|
||||
u := m.URL()
|
||||
cc := mustNewHTTPClient(t, []string{u})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := kapi.Watch(key, resp.Node.ModifiedIndex).Next(ctx); err != nil {
|
||||
mcc := mustNewHTTPClient(t, []string{u})
|
||||
mkapi := client.NewKeysAPI(mcc)
|
||||
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
if _, err := mkapi.Watch(key, resp.Node.ModifiedIndex).Next(mctx); err != nil {
|
||||
t.Fatalf("#%d: watch on %s error: %v", i, u, err)
|
||||
}
|
||||
cancel()
|
||||
mcancel()
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,7 +526,7 @@ func (m *member) Launch() error {
|
||||
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
|
||||
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
@ -547,6 +547,24 @@ func (m *member) Launch() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) WaitOK(t *testing.T) {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
_, err := kapi.Get(ctx, "/")
|
||||
if err != nil {
|
||||
time.Sleep(tickDuration)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
for m.s.Leader() == 0 {
|
||||
time.Sleep(tickDuration)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *member) URL() string { return m.ClientURLs[0].String() }
|
||||
|
||||
func (m *member) Pause() {
|
||||
|
@ -15,9 +15,14 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
func TestPauseMember(t *testing.T) {
|
||||
@ -74,3 +79,44 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
|
||||
t.Errorf("unexpect successful launch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotAndRestartMember(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
m := mustNewMember(t, "snapAndRestartTest")
|
||||
m.SnapCount = 100
|
||||
m.Launch()
|
||||
defer m.Terminate(t)
|
||||
m.WaitOK(t)
|
||||
|
||||
resps := make([]*client.Response, 120)
|
||||
var err error
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
resps[i], err = kapi.Create(ctx, "/"+key, "bar", -1)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: create on %s error: %v", i, m.URL(), err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
m.Stop(t)
|
||||
m.Restart(t)
|
||||
|
||||
for i := 0; i < 120; i++ {
|
||||
cc := mustNewHTTPClient(t, []string{m.URL()})
|
||||
kapi := client.NewKeysAPI(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
key := fmt.Sprintf("foo%d", i)
|
||||
resp, err := kapi.Get(ctx, "/"+key)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err)
|
||||
}
|
||||
cancel()
|
||||
|
||||
if !reflect.DeepEqual(resp.Node, resps[i].Node) {
|
||||
t.Errorf("#%d: node = %v, want %v", i, resp.Node, resps[i].Node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -327,21 +327,21 @@ func TestV2Delete(t *testing.T) {
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
|
||||
r, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
resp, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/emptydir?dir=true"), v)
|
||||
r.Body.Close()
|
||||
r, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/emptydir?dir=true"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
resp, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foodir/bar?dir=true"), v)
|
||||
r.Body.Close()
|
||||
r, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foodir/bar?dir=true"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
r.Body.Close()
|
||||
|
||||
tests := []struct {
|
||||
relativeURL string
|
||||
@ -423,17 +423,17 @@ func TestV2CAD(t *testing.T) {
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
|
||||
r, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
r.Body.Close()
|
||||
|
||||
resp, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foovalue"), v)
|
||||
r, err = tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foovalue"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
r.Body.Close()
|
||||
|
||||
tests := []struct {
|
||||
relativeURL string
|
||||
@ -582,11 +582,11 @@ func TestV2Get(t *testing.T) {
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar/zar"), v)
|
||||
r, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar/zar"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
r.Body.Close()
|
||||
|
||||
tests := []struct {
|
||||
relativeURL string
|
||||
@ -676,11 +676,11 @@ func TestV2QuorumGet(t *testing.T) {
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar/zar?quorum=true"), v)
|
||||
r, err := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar/zar?quorum=true"), v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
r.Body.Close()
|
||||
|
||||
tests := []struct {
|
||||
relativeURL string
|
||||
|
@ -175,8 +175,8 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
|
||||
delete(snapNodes, p.Name)
|
||||
}
|
||||
if len(snapNodes) == 1 {
|
||||
for name, id := range nodes {
|
||||
log.Printf("Autodetected from snapshot: name %s", name)
|
||||
for nodename, id := range nodes {
|
||||
log.Printf("Autodetected from snapshot: name %s", nodename)
|
||||
return id
|
||||
}
|
||||
}
|
||||
@ -186,8 +186,8 @@ func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name s
|
||||
delete(nodes, p.Name)
|
||||
}
|
||||
if len(nodes) == 1 {
|
||||
for name, id := range nodes {
|
||||
log.Printf("Autodetected name %s", name)
|
||||
for nodename, id := range nodes {
|
||||
log.Printf("Autodetected name %s", nodename)
|
||||
return id
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var binDir = ".versions"
|
||||
|
||||
type Proc struct {
|
||||
*exec.Cmd
|
||||
Name string
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
binDir = ".versions"
|
||||
v1BinPath = path.Join(binDir, "1")
|
||||
v2BinPath = path.Join(binDir, "2")
|
||||
etcdctlBinPath string
|
||||
|
@ -63,6 +63,24 @@ type node struct {
|
||||
Children map[string]*node // for directory
|
||||
}
|
||||
|
||||
func deepCopyNode(n *node, parent *node) *node {
|
||||
out := &node{
|
||||
Path: n.Path,
|
||||
CreatedIndex: n.CreatedIndex,
|
||||
ModifiedIndex: n.ModifiedIndex,
|
||||
Parent: parent,
|
||||
ExpireTime: n.ExpireTime,
|
||||
ACL: n.ACL,
|
||||
Value: n.Value,
|
||||
Children: make(map[string]*node),
|
||||
}
|
||||
for k, v := range n.Children {
|
||||
out.Children[k] = deepCopyNode(v, out)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func replacePathNames(n *node, s1, s2 string) {
|
||||
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
|
||||
for _, c := range n.Children {
|
||||
@ -87,9 +105,23 @@ func pullNodesFromEtcd(n *node) map[string]uint64 {
|
||||
return out
|
||||
}
|
||||
|
||||
func fixEtcd(n *node) {
|
||||
n.Path = "/0"
|
||||
machines := n.Children["machines"]
|
||||
func fixEtcd(etcdref *node) *node {
|
||||
n := &node{
|
||||
Path: "/0",
|
||||
CreatedIndex: etcdref.CreatedIndex,
|
||||
ModifiedIndex: etcdref.ModifiedIndex,
|
||||
ExpireTime: etcdref.ExpireTime,
|
||||
ACL: etcdref.ACL,
|
||||
Children: make(map[string]*node),
|
||||
}
|
||||
|
||||
var machines *node
|
||||
if machineOrig, ok := etcdref.Children["machines"]; ok {
|
||||
machines = deepCopyNode(machineOrig, n)
|
||||
}
|
||||
if machines == nil {
|
||||
return n
|
||||
}
|
||||
n.Children["members"] = &node{
|
||||
Path: "/0/members",
|
||||
CreatedIndex: machines.CreatedIndex,
|
||||
@ -97,6 +129,7 @@ func fixEtcd(n *node) {
|
||||
ExpireTime: machines.ExpireTime,
|
||||
ACL: machines.ACL,
|
||||
Children: make(map[string]*node),
|
||||
Parent: n,
|
||||
}
|
||||
for name, c := range machines.Children {
|
||||
q, err := url.ParseQuery(c.Value)
|
||||
@ -121,29 +154,32 @@ func fixEtcd(n *node) {
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Children: map[string]*node{
|
||||
"attributes": &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(attrBytes),
|
||||
},
|
||||
"raftAttributes": &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(raftBytes),
|
||||
},
|
||||
},
|
||||
Children: make(map[string]*node),
|
||||
Parent: n.Children["members"],
|
||||
}
|
||||
attrs := &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "attributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(attrBytes),
|
||||
Parent: newNode,
|
||||
}
|
||||
newNode.Children["attributes"] = attrs
|
||||
raftAttrs := &node{
|
||||
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
|
||||
CreatedIndex: c.CreatedIndex,
|
||||
ModifiedIndex: c.ModifiedIndex,
|
||||
ExpireTime: c.ExpireTime,
|
||||
ACL: c.ACL,
|
||||
Value: string(raftBytes),
|
||||
Parent: newNode,
|
||||
}
|
||||
newNode.Children["raftAttributes"] = raftAttrs
|
||||
n.Children["members"].Children[m.ID.String()] = newNode
|
||||
}
|
||||
delete(n.Children, "machines")
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func mangleRoot(n *node) *node {
|
||||
@ -157,10 +193,10 @@ func mangleRoot(n *node) *node {
|
||||
}
|
||||
newRoot.Children["1"] = n
|
||||
etcd := n.Children["_etcd"]
|
||||
delete(n.Children, "_etcd")
|
||||
replacePathNames(n, "/", "/1/")
|
||||
fixEtcd(etcd)
|
||||
newRoot.Children["0"] = etcd
|
||||
newZero := fixEtcd(etcd)
|
||||
newZero.Parent = newRoot
|
||||
newRoot.Children["0"] = newZero
|
||||
return newRoot
|
||||
}
|
||||
|
||||
|
@ -30,11 +30,10 @@ import (
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdmain"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
etcdversion "github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
@ -106,17 +105,17 @@ func checkInternalVersion(fs *flag.FlagSet) version {
|
||||
log.Fatalf("starter: please set --data-dir or ETCD_DATA_DIR for etcd")
|
||||
}
|
||||
// check the data directory
|
||||
ver, err := checkVersion(dataDir)
|
||||
dataver, err := wal.DetectVersion(dataDir)
|
||||
if err != nil {
|
||||
log.Fatalf("starter: failed to detect etcd version in %v: %v", dataDir, err)
|
||||
}
|
||||
log.Printf("starter: detect etcd version %s in %s", ver, dataDir)
|
||||
switch ver {
|
||||
case v2_0:
|
||||
log.Printf("starter: detect etcd version %s in %s", dataver, dataDir)
|
||||
switch dataver {
|
||||
case wal.WALv2_0:
|
||||
return internalV2
|
||||
case v2_0Proxy:
|
||||
case wal.WALv2_0Proxy:
|
||||
return internalV2Proxy
|
||||
case v0_4:
|
||||
case wal.WALv0_4:
|
||||
standbyInfo, err := migrate.DecodeStandbyInfo4FromFile(standbyInfo4(dataDir))
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
log.Fatalf("starter: failed to decode standbyInfo in %v: %v", dataDir, err)
|
||||
@ -140,7 +139,7 @@ func checkInternalVersion(fs *flag.FlagSet) version {
|
||||
log.Fatalf("starter: failed to check start version in %v: %v", dataDir, err)
|
||||
}
|
||||
return ver
|
||||
case empty:
|
||||
case wal.WALNotExist:
|
||||
discovery := fs.Lookup("discovery").Value.String()
|
||||
dpeers, err := getPeersFromDiscoveryURL(discovery)
|
||||
if err != nil {
|
||||
@ -162,33 +161,6 @@ func checkInternalVersion(fs *flag.FlagSet) version {
|
||||
return internalUnknown
|
||||
}
|
||||
|
||||
func checkVersion(dataDir string) (version, error) {
|
||||
names, err := fileutil.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
return empty, err
|
||||
}
|
||||
if len(names) == 0 {
|
||||
return empty, nil
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.ContainsAll([]string{"member"}) {
|
||||
return v2_0, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"proxy"}) {
|
||||
return v2_0Proxy, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
return v0_4, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"standby_info"}) {
|
||||
return v0_4, nil
|
||||
}
|
||||
return unknown, fmt.Errorf("failed to check version")
|
||||
}
|
||||
|
||||
func checkInternalVersionByDataDir4(dataDir string) (version, error) {
|
||||
// check v0.4 snapshot
|
||||
snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
|
||||
@ -353,7 +325,8 @@ func newDefaultClient(tls *TLSInfo) (*http.Client, error) {
|
||||
}
|
||||
|
||||
type value struct {
|
||||
s string
|
||||
isBoolFlag bool
|
||||
s string
|
||||
}
|
||||
|
||||
func (v *value) String() string { return v.s }
|
||||
@ -363,14 +336,20 @@ func (v *value) Set(s string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *value) IsBoolFlag() bool { return true }
|
||||
func (v *value) IsBoolFlag() bool { return v.isBoolFlag }
|
||||
|
||||
type boolFlag interface {
|
||||
flag.Value
|
||||
IsBoolFlag() bool
|
||||
}
|
||||
|
||||
// parseConfig parses out the input config from cmdline arguments and
|
||||
// environment variables.
|
||||
func parseConfig(args []string) (*flag.FlagSet, error) {
|
||||
fs := flag.NewFlagSet("full flagset", flag.ContinueOnError)
|
||||
etcdmain.NewConfig().VisitAll(func(f *flag.Flag) {
|
||||
fs.Var(&value{}, f.Name, "")
|
||||
_, isBoolFlag := f.Value.(boolFlag)
|
||||
fs.Var(&value{isBoolFlag: isBoolFlag}, f.Name, "")
|
||||
})
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return nil, err
|
||||
|
70
migrate/starter/starter_test.go
Normal file
70
migrate/starter/starter_test.go
Normal file
@ -0,0 +1,70 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package starter
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
args []string
|
||||
wvals map[string]string
|
||||
}{
|
||||
{
|
||||
[]string{"--name", "etcd", "--data-dir", "dir"},
|
||||
map[string]string{
|
||||
"name": "etcd",
|
||||
"data-dir": "dir",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--name=etcd", "--data-dir=dir"},
|
||||
map[string]string{
|
||||
"name": "etcd",
|
||||
"data-dir": "dir",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--version", "--name", "etcd"},
|
||||
map[string]string{
|
||||
"version": "true",
|
||||
"name": "etcd",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--version=true", "--name", "etcd"},
|
||||
map[string]string{
|
||||
"version": "true",
|
||||
"name": "etcd",
|
||||
},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
fs, err := parseConfig(tt.args)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: unexpected parseConfig error: %v", i, err)
|
||||
}
|
||||
vals := make(map[string]string)
|
||||
fs.Visit(func(f *flag.Flag) {
|
||||
vals[f.Name] = f.Value.String()
|
||||
})
|
||||
if !reflect.DeepEqual(vals, tt.wvals) {
|
||||
t.Errorf("#%d: vals = %+v, want %+v", i, vals, tt.wvals)
|
||||
}
|
||||
}
|
||||
}
|
81
pkg/osutil/interrupt_unix.go
Normal file
81
pkg/osutil/interrupt_unix.go
Normal file
@ -0,0 +1,81 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows,!plan9
|
||||
|
||||
// InterruptHandler is a function that is called on receiving a
|
||||
// SIGTERM or SIGINT signal.
|
||||
|
||||
package osutil
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type InterruptHandler func()
|
||||
|
||||
var (
|
||||
interruptRegisterMu, interruptExitMu sync.Mutex
|
||||
// interruptHandlers holds all registered InterruptHandlers in order
|
||||
// they will be executed.
|
||||
interruptHandlers = []InterruptHandler{}
|
||||
)
|
||||
|
||||
// RegisterInterruptHandler registers a new InterruptHandler. Handlers registered
|
||||
// after interrupt handing was initiated will not be executed.
|
||||
func RegisterInterruptHandler(h InterruptHandler) {
|
||||
interruptRegisterMu.Lock()
|
||||
defer interruptRegisterMu.Unlock()
|
||||
interruptHandlers = append(interruptHandlers, h)
|
||||
}
|
||||
|
||||
// HandleInterrupts calls the handler functions on receiving a SIGINT or SIGTERM.
|
||||
func HandleInterrupts() {
|
||||
notifier := make(chan os.Signal, 1)
|
||||
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
sig := <-notifier
|
||||
|
||||
interruptRegisterMu.Lock()
|
||||
ihs := make([]InterruptHandler, len(interruptHandlers))
|
||||
copy(ihs, interruptHandlers)
|
||||
interruptRegisterMu.Unlock()
|
||||
|
||||
interruptExitMu.Lock()
|
||||
|
||||
log.Printf("received %v signal, shutting down...", sig)
|
||||
|
||||
for _, h := range ihs {
|
||||
h()
|
||||
}
|
||||
signal.Stop(notifier)
|
||||
pid := syscall.Getpid()
|
||||
// exit directly if it is the "init" process, since the kernel will not help to kill pid 1.
|
||||
if pid == 1 {
|
||||
os.Exit(0)
|
||||
}
|
||||
syscall.Kill(pid, sig.(syscall.Signal))
|
||||
}()
|
||||
}
|
||||
|
||||
// Exit relays to os.Exit if no interrupt handlers are running, blocks otherwise.
|
||||
func Exit(code int) {
|
||||
interruptExitMu.Lock()
|
||||
os.Exit(code)
|
||||
}
|
32
pkg/osutil/interrupt_windows.go
Normal file
32
pkg/osutil/interrupt_windows.go
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build windows
|
||||
|
||||
package osutil
|
||||
|
||||
import "os"
|
||||
|
||||
type InterruptHandler func()
|
||||
|
||||
// RegisterInterruptHandler is a no-op on windows
|
||||
func RegisterInterruptHandler(h InterruptHandler) {}
|
||||
|
||||
// HandleInterrupts is a no-op on windows
|
||||
func HandleInterrupts() {}
|
||||
|
||||
// Exit calls os.Exit
|
||||
func Exit(code int) {
|
||||
os.Exit(code)
|
||||
}
|
@ -16,8 +16,11 @@ package osutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestUnsetenv(t *testing.T) {
|
||||
@ -43,3 +46,43 @@ func TestUnsetenv(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
|
||||
select {
|
||||
case s := <-c:
|
||||
if s != sig {
|
||||
t.Fatalf("signal was %v, want %v", s, sig)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout waiting for %v", sig)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleInterrupts(t *testing.T) {
|
||||
for _, sig := range []syscall.Signal{syscall.SIGINT, syscall.SIGTERM} {
|
||||
n := 1
|
||||
RegisterInterruptHandler(func() { n++ })
|
||||
RegisterInterruptHandler(func() { n *= 2 })
|
||||
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, sig)
|
||||
|
||||
HandleInterrupts()
|
||||
syscall.Kill(syscall.Getpid(), sig)
|
||||
|
||||
// we should receive the signal once from our own kill and
|
||||
// a second time from HandleInterrupts
|
||||
waitSig(t, c, sig)
|
||||
waitSig(t, c, sig)
|
||||
|
||||
if n == 3 {
|
||||
t.Fatalf("interrupt handlers were called in wrong order")
|
||||
}
|
||||
if n != 4 {
|
||||
t.Fatalf("interrupt handlers were not called properly")
|
||||
}
|
||||
// reset interrupt handlers
|
||||
interruptHandlers = interruptHandlers[:0]
|
||||
interruptExitMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
62
pkg/wait/wait_time.go
Normal file
62
pkg/wait/wait_time.go
Normal file
@ -0,0 +1,62 @@
|
||||
/*
|
||||
Copyright 2015 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wait
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WaitTime interface {
|
||||
// Wait returns a chan that waits on the given deadline.
|
||||
// The chan will be triggered when Trigger is called with a
|
||||
// deadline that is later than the one it is waiting for.
|
||||
// The given deadline MUST be unique. The deadline should be
|
||||
// retrived by calling time.Now() in most cases.
|
||||
Wait(deadline time.Time) <-chan struct{}
|
||||
// Trigger triggers all the waiting chans with an earlier deadline.
|
||||
Trigger(deadline time.Time)
|
||||
}
|
||||
|
||||
type timeList struct {
|
||||
l sync.Mutex
|
||||
m map[int64]chan struct{}
|
||||
}
|
||||
|
||||
func NewTimeList() *timeList {
|
||||
return &timeList{m: make(map[int64]chan struct{})}
|
||||
}
|
||||
|
||||
func (tl *timeList) Wait(deadline time.Time) <-chan struct{} {
|
||||
tl.l.Lock()
|
||||
defer tl.l.Unlock()
|
||||
ch := make(chan struct{}, 1)
|
||||
// The given deadline SHOULD be unique.
|
||||
tl.m[deadline.UnixNano()] = ch
|
||||
return ch
|
||||
}
|
||||
|
||||
func (tl *timeList) Trigger(deadline time.Time) {
|
||||
tl.l.Lock()
|
||||
defer tl.l.Unlock()
|
||||
for t, ch := range tl.m {
|
||||
if t < deadline.UnixNano() {
|
||||
delete(tl.m, t)
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
}
|
85
pkg/wait/wait_time_test.go
Normal file
85
pkg/wait/wait_time_test.go
Normal file
@ -0,0 +1,85 @@
|
||||
/*
|
||||
|
||||
Copyright 2015 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
package wait
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWaitTime(t *testing.T) {
|
||||
wt := NewTimeList()
|
||||
ch1 := wt.Wait(time.Now())
|
||||
t1 := time.Now()
|
||||
wt.Trigger(t1)
|
||||
select {
|
||||
case <-ch1:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("cannot receive from ch as expected")
|
||||
}
|
||||
|
||||
ch2 := wt.Wait(time.Now())
|
||||
t2 := time.Now()
|
||||
wt.Trigger(t1)
|
||||
select {
|
||||
case <-ch2:
|
||||
t.Fatalf("unexpected to receive from ch")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
wt.Trigger(t2)
|
||||
select {
|
||||
case <-ch2:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("cannot receive from ch as expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitTestStress(t *testing.T) {
|
||||
chs := make([]<-chan struct{}, 0)
|
||||
wt := NewTimeList()
|
||||
for i := 0; i < 10000; i++ {
|
||||
chs = append(chs, wt.Wait(time.Now()))
|
||||
}
|
||||
wt.Trigger(time.Now())
|
||||
|
||||
for _, ch := range chs {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("cannot receive from ch as expected")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWaitTime(b *testing.B) {
|
||||
t := time.Now()
|
||||
wt := NewTimeList()
|
||||
for i := 0; i < b.N; i++ {
|
||||
wt.Wait(t)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkTriggerAnd10KWaitTime(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
t := time.Now()
|
||||
wt := NewTimeList()
|
||||
for j := 0; j < 10000; j++ {
|
||||
wt.Wait(t)
|
||||
}
|
||||
wt.Trigger(time.Now())
|
||||
}
|
||||
}
|
@ -296,15 +296,15 @@ func TestCompactionSideEffects(t *testing.T) {
|
||||
t.Errorf("lastIndex = %d, want %d", raftLog.lastIndex(), lastIndex)
|
||||
}
|
||||
|
||||
for i := offset; i <= raftLog.lastIndex(); i++ {
|
||||
if raftLog.term(i) != i {
|
||||
t.Errorf("term(%d) = %d, want %d", i, raftLog.term(i), i)
|
||||
for j := offset; j <= raftLog.lastIndex(); j++ {
|
||||
if raftLog.term(j) != j {
|
||||
t.Errorf("term(%d) = %d, want %d", j, raftLog.term(j), j)
|
||||
}
|
||||
}
|
||||
|
||||
for i := offset; i <= raftLog.lastIndex(); i++ {
|
||||
if !raftLog.matchTerm(i, i) {
|
||||
t.Errorf("matchTerm(%d) = false, want true", i)
|
||||
for j := offset; j <= raftLog.lastIndex(); j++ {
|
||||
if !raftLog.matchTerm(j, j) {
|
||||
t.Errorf("matchTerm(%d) = false, want true", j)
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,9 +354,9 @@ func TestNextEnts(t *testing.T) {
|
||||
raftLog.maybeCommit(5, 1)
|
||||
raftLog.appliedTo(tt.applied)
|
||||
|
||||
ents := raftLog.nextEnts()
|
||||
if !reflect.DeepEqual(ents, tt.wents) {
|
||||
t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt.wents)
|
||||
nents := raftLog.nextEnts()
|
||||
if !reflect.DeepEqual(nents, tt.wents) {
|
||||
t.Errorf("#%d: nents = %+v, want %+v", i, nents, tt.wents)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -649,10 +649,10 @@ func TestTerm(t *testing.T) {
|
||||
{offset + num, 0},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
for j, tt := range tests {
|
||||
term := l.term(tt.index)
|
||||
if !reflect.DeepEqual(term, tt.w) {
|
||||
t.Errorf("#%d: at = %d, want %d", i, term, tt.w)
|
||||
t.Errorf("#%d: at = %d, want %d", j, term, tt.w)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -712,18 +712,18 @@ func TestSlice(t *testing.T) {
|
||||
{offset + num, offset + num + 1, nil, true},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
for j, tt := range tests {
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if !tt.wpanic {
|
||||
t.Errorf("%d: panic = %v, want %v: %v", i, true, false, r)
|
||||
t.Errorf("%d: panic = %v, want %v: %v", j, true, false, r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
g := l.slice(tt.from, tt.to)
|
||||
if !reflect.DeepEqual(g, tt.w) {
|
||||
t.Errorf("#%d: from %d to %d = %v, want %v", i, tt.from, tt.to, g, tt.w)
|
||||
t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -332,10 +332,10 @@ func TestNodeStart(t *testing.T) {
|
||||
}
|
||||
|
||||
n.Propose(ctx, []byte("foo"))
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1])
|
||||
if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
|
||||
} else {
|
||||
storage.Append(g.Entries)
|
||||
storage.Append(g2.Entries)
|
||||
n.Advance()
|
||||
}
|
||||
|
||||
|
@ -774,7 +774,7 @@ func TestVoteRequest(t *testing.T) {
|
||||
{[]pb.Entry{{Term: 1, Index: 1}}, 2},
|
||||
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
for j, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.Step(pb.Message{
|
||||
From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
|
||||
@ -788,7 +788,7 @@ func TestVoteRequest(t *testing.T) {
|
||||
msgs := r.readMessages()
|
||||
sort.Sort(messageSlice(msgs))
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("#%d: len(msg) = %d, want %d", i, len(msgs), 2)
|
||||
t.Fatalf("#%d: len(msg) = %d, want %d", j, len(msgs), 2)
|
||||
}
|
||||
for i, m := range msgs {
|
||||
if m.Type != pb.MsgVote {
|
||||
|
@ -510,7 +510,7 @@ func TestOldMessages(t *testing.T) {
|
||||
// commit a new entry
|
||||
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
|
||||
l := &raftLog{
|
||||
ilog := &raftLog{
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{
|
||||
{}, {Data: nil, Term: 1, Index: 1},
|
||||
@ -521,7 +521,7 @@ func TestOldMessages(t *testing.T) {
|
||||
unstable: unstable{offset: 5},
|
||||
committed: 4,
|
||||
}
|
||||
base := ltoa(l)
|
||||
base := ltoa(ilog)
|
||||
for i, p := range tt.peers {
|
||||
if sm, ok := p.(*raft); ok {
|
||||
l := ltoa(sm.raftLog)
|
||||
@ -548,7 +548,7 @@ func TestProposal(t *testing.T) {
|
||||
{newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
for j, tt := range tests {
|
||||
send := func(m pb.Message) {
|
||||
defer func() {
|
||||
// only recover is we expect it to panic so
|
||||
@ -556,7 +556,7 @@ func TestProposal(t *testing.T) {
|
||||
if !tt.success {
|
||||
e := recover()
|
||||
if e != nil {
|
||||
t.Logf("#%d: err: %s", i, e)
|
||||
t.Logf("#%d: err: %s", j, e)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -591,7 +591,7 @@ func TestProposal(t *testing.T) {
|
||||
}
|
||||
sm := tt.network.peers[1].(*raft)
|
||||
if g := sm.Term; g != 1 {
|
||||
t.Errorf("#%d: term = %d, want %d", i, g, 1)
|
||||
t.Errorf("#%d: term = %d, want %d", j, g, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -603,7 +603,7 @@ func TestProposalByProxy(t *testing.T) {
|
||||
newNetwork(nil, nil, nopStepper),
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
for j, tt := range tests {
|
||||
// promote 0 the leader
|
||||
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
|
||||
@ -629,7 +629,7 @@ func TestProposalByProxy(t *testing.T) {
|
||||
}
|
||||
sm := tt.peers[1].(*raft)
|
||||
if g := sm.Term; g != 1 {
|
||||
t.Errorf("#%d: term = %d, want %d", i, g, 1)
|
||||
t.Errorf("#%d: term = %d, want %d", j, g, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1601,8 +1601,8 @@ func newNetwork(peers ...Interface) *network {
|
||||
npeers := make(map[uint64]Interface, size)
|
||||
nstorage := make(map[uint64]*MemoryStorage, size)
|
||||
|
||||
for i, p := range peers {
|
||||
id := peerAddrs[i]
|
||||
for j, p := range peers {
|
||||
id := peerAddrs[j]
|
||||
switch v := p.(type) {
|
||||
case nil:
|
||||
nstorage[id] = NewMemoryStorage()
|
||||
|
@ -84,10 +84,10 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
|
||||
// Entry for debugging.
|
||||
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
|
||||
var formatted string
|
||||
if f == nil {
|
||||
formatted = fmt.Sprintf("%q", e.Data)
|
||||
} else {
|
||||
if e.Type == pb.EntryNormal && f != nil {
|
||||
formatted = f(e.Data)
|
||||
} else {
|
||||
formatted = fmt.Sprintf("%q", e.Data)
|
||||
}
|
||||
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
|
||||
}
|
||||
|
@ -54,7 +54,9 @@ func (er *entryReader) readEntries() ([]raftpb.Entry, error) {
|
||||
}
|
||||
er.ents.Add()
|
||||
}
|
||||
er.lastIndex.Set(int64(ents[l-1].Index))
|
||||
if l > 0 {
|
||||
er.lastIndex.Set(int64(ents[l-1].Index))
|
||||
}
|
||||
return ents, nil
|
||||
}
|
||||
|
||||
|
@ -295,6 +295,9 @@ func (s *streamReader) handle(r io.Reader) {
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(ents) == 0 {
|
||||
continue
|
||||
}
|
||||
// The commit index field in appendEntry message is not recovered.
|
||||
// The follower updates its commit index through heartbeat.
|
||||
msg := raftpb.Message{
|
||||
|
@ -37,6 +37,7 @@ type Transporter interface {
|
||||
Send(m []raftpb.Message)
|
||||
AddPeer(id types.ID, urls []string)
|
||||
RemovePeer(id types.ID)
|
||||
RemoveAllPeers()
|
||||
UpdatePeer(id types.ID, urls []string)
|
||||
Stop()
|
||||
}
|
||||
@ -132,7 +133,24 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
|
||||
func (t *transport) RemovePeer(id types.ID) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.peers[id].Stop()
|
||||
t.removePeer(id)
|
||||
}
|
||||
|
||||
func (t *transport) RemoveAllPeers() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
for id, _ := range t.peers {
|
||||
t.removePeer(id)
|
||||
}
|
||||
}
|
||||
|
||||
// the caller of this function must have the peers mutex.
|
||||
func (t *transport) removePeer(id types.ID) {
|
||||
if peer, ok := t.peers[id]; ok {
|
||||
peer.Stop()
|
||||
} else {
|
||||
log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id)
|
||||
}
|
||||
delete(t.peers, id)
|
||||
delete(t.leaderStats.Followers, id.String())
|
||||
}
|
||||
|
@ -86,25 +86,25 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
}
|
||||
|
||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
var err error
|
||||
var b []byte
|
||||
|
||||
fpath := path.Join(dir, name)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
renameBroken(fpath)
|
||||
}
|
||||
}()
|
||||
|
||||
b, err = ioutil.ReadFile(fpath)
|
||||
snap, err := Read(fpath)
|
||||
if err != nil {
|
||||
log.Printf("snap: snapshotter cannot read file %v: %v", name, err)
|
||||
renameBroken(fpath)
|
||||
}
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Read reads the snapshot named by snapname and returns the snapshot.
|
||||
func Read(snapname string) (*raftpb.Snapshot, error) {
|
||||
b, err := ioutil.ReadFile(snapname)
|
||||
if err != nil {
|
||||
log.Printf("snap: snapshotter cannot read file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var serializedSnap snappb.Snapshot
|
||||
if err = serializedSnap.Unmarshal(b); err != nil {
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", name, err)
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -115,13 +115,13 @@ func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
|
||||
crc := crc32.Update(0, crcTable, serializedSnap.Data)
|
||||
if crc != serializedSnap.Crc {
|
||||
log.Printf("snap: corrupted snapshot file %v: crc mismatch", name)
|
||||
log.Printf("snap: corrupted snapshot file %v: crc mismatch", snapname)
|
||||
return nil, ErrCRCMismatch
|
||||
}
|
||||
|
||||
var snap raftpb.Snapshot
|
||||
if err = snap.Unmarshal(serializedSnap.Data); err != nil {
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", name, err)
|
||||
log.Printf("snap: corrupted snapshot file %v: %v", snapname, err)
|
||||
return nil, err
|
||||
}
|
||||
return &snap, nil
|
||||
|
@ -88,8 +88,8 @@ func TestFullEventQueue(t *testing.T) {
|
||||
|
||||
// Add
|
||||
for i := 0; i < 1000; i++ {
|
||||
e := newEvent(Create, "/foo", uint64(i), uint64(i))
|
||||
eh.addEvent(e)
|
||||
ce := newEvent(Create, "/foo", uint64(i), uint64(i))
|
||||
eh.addEvent(ce)
|
||||
e, err := eh.scan("/foo", true, uint64(i-1))
|
||||
if i > 0 {
|
||||
if e == nil || err != nil {
|
||||
|
@ -51,10 +51,10 @@ func TestHeapUpdate(t *testing.T) {
|
||||
|
||||
// add from older expire time to earlier expire time
|
||||
// the path is equal to ttl from now
|
||||
for i, n := range kvs {
|
||||
for i := range kvs {
|
||||
path := fmt.Sprintf("%v", 10-i)
|
||||
m := time.Duration(10 - i)
|
||||
n = newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
|
||||
n := newKV(nil, path, path, 0, nil, "", time.Now().Add(time.Second*m))
|
||||
kvs[i] = n
|
||||
h.push(n)
|
||||
}
|
||||
|
9
test
9
test
@ -60,4 +60,13 @@ if [ -n "${vetRes}" ]; then
|
||||
exit 255
|
||||
fi
|
||||
|
||||
if command -v go-nyet >/dev/null 2>&1; then
|
||||
echo "Checking go-nyet..."
|
||||
nyetRes=$(go-nyet -exitWith 0 $FMT)
|
||||
if [ -n "${nyetRes}" ]; then
|
||||
echo -e "go-nyet checking failed:\n${nyetRes}"
|
||||
exit 255
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "Success"
|
||||
|
@ -32,24 +32,47 @@ import (
|
||||
|
||||
func main() {
|
||||
from := flag.String("data-dir", "", "")
|
||||
snapfile := flag.String("start-snap", "", "The base name of snapshot file to start dumping")
|
||||
index := flag.Uint64("start-index", 0, "The index to start dumping")
|
||||
flag.Parse()
|
||||
if *from == "" {
|
||||
log.Fatal("Must provide -data-dir flag")
|
||||
log.Fatal("Must provide -data-dir flag.")
|
||||
}
|
||||
if *snapfile != "" && *index != 0 {
|
||||
log.Fatal("start-snap and start-index flags cannot be used together.")
|
||||
}
|
||||
|
||||
ss := snap.New(snapDir(*from))
|
||||
snapshot, err := ss.Load()
|
||||
var walsnap walpb.Snapshot
|
||||
switch err {
|
||||
case nil:
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
|
||||
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
|
||||
walsnap.Term, walsnap.Index, nodes)
|
||||
case snap.ErrNoSnapshot:
|
||||
fmt.Printf("Snapshot:\nempty\n")
|
||||
default:
|
||||
log.Fatalf("Failed loading snapshot: %v", err)
|
||||
var (
|
||||
walsnap walpb.Snapshot
|
||||
snapshot *raftpb.Snapshot
|
||||
err error
|
||||
)
|
||||
|
||||
isIndex := *index != 0
|
||||
|
||||
if isIndex {
|
||||
fmt.Printf("Start dumping log entries from index %d.\n", *index)
|
||||
walsnap.Index = *index
|
||||
} else {
|
||||
if *snapfile == "" {
|
||||
ss := snap.New(snapDir(*from))
|
||||
snapshot, err = ss.Load()
|
||||
} else {
|
||||
snapshot, err = snap.Read(path.Join(snapDir(*from), *snapfile))
|
||||
}
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
|
||||
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
|
||||
walsnap.Term, walsnap.Index, nodes)
|
||||
case snap.ErrNoSnapshot:
|
||||
fmt.Printf("Snapshot:\nempty\n")
|
||||
default:
|
||||
log.Fatalf("Failed loading snapshot: %v", err)
|
||||
}
|
||||
fmt.Println("Start dupmping log entries from snapshot.")
|
||||
}
|
||||
|
||||
w, err := wal.Open(walDir(*from), walsnap)
|
||||
@ -58,7 +81,7 @@ func main() {
|
||||
}
|
||||
wmetadata, state, ents, err := w.ReadAll()
|
||||
w.Close()
|
||||
if err != nil {
|
||||
if err != nil && (!isIndex || err != wal.ErrSnapshotNotFound) {
|
||||
log.Fatalf("Failed reading WAL: %v", err)
|
||||
}
|
||||
id, cid := parseWALMetadata(wmetadata)
|
||||
@ -102,9 +125,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func walDir(dataDir string) string { return path.Join(dataDir, "wal") }
|
||||
func walDir(dataDir string) string { return path.Join(dataDir, "member", "wal") }
|
||||
|
||||
func snapDir(dataDir string) string { return path.Join(dataDir, "snap") }
|
||||
func snapDir(dataDir string) string { return path.Join(dataDir, "member", "snap") }
|
||||
|
||||
func parseWALMetadata(b []byte) (id, cid types.ID) {
|
||||
var metadata etcdserverpb.Metadata
|
||||
|
@ -15,6 +15,6 @@
|
||||
package version
|
||||
|
||||
var (
|
||||
Version = "2.0.1"
|
||||
Version = "2.0.4"
|
||||
InternalVersion = "2"
|
||||
)
|
||||
|
28
wal/util.go
28
wal/util.go
@ -28,10 +28,12 @@ import (
|
||||
type WalVersion string
|
||||
|
||||
const (
|
||||
WALUnknown WalVersion = "Unknown WAL"
|
||||
WALNotExist WalVersion = "No WAL"
|
||||
WALv0_4 WalVersion = "0.4.x"
|
||||
WALv0_5 WalVersion = "0.5.x"
|
||||
WALUnknown WalVersion = "Unknown WAL"
|
||||
WALNotExist WalVersion = "No WAL"
|
||||
WALv0_4 WalVersion = "0.4.x"
|
||||
WALv2_0 WalVersion = "2.0.0"
|
||||
WALv2_0Proxy WalVersion = "2.0 proxy"
|
||||
WALv2_0_1 WalVersion = "2.0.1"
|
||||
)
|
||||
|
||||
func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
@ -48,15 +50,31 @@ func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
return WALNotExist, nil
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.Contains("member") {
|
||||
ver, err := DetectVersion(path.Join(dirpath, "member"))
|
||||
if ver == WALv2_0 {
|
||||
return WALv2_0_1, nil
|
||||
} else if ver == WALv0_4 {
|
||||
// How in the blazes did it get there?
|
||||
return WALUnknown, nil
|
||||
}
|
||||
return ver, err
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snap", "wal"}) {
|
||||
// .../wal cannot be empty to exist.
|
||||
if Exist(path.Join(dirpath, "wal")) {
|
||||
return WALv0_5, nil
|
||||
return WALv2_0, nil
|
||||
}
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"proxy"}) {
|
||||
return WALv2_0Proxy, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
return WALv0_4, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"standby_info"}) {
|
||||
return WALv0_4, nil
|
||||
}
|
||||
|
||||
return WALUnknown, nil
|
||||
}
|
||||
|
@ -28,7 +28,8 @@ func TestDetectVersion(t *testing.T) {
|
||||
wver WalVersion
|
||||
}{
|
||||
{[]string{}, WALNotExist},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv0_5},
|
||||
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
|
||||
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
|
||||
{[]string{"weird"}, WALUnknown},
|
||||
{[]string{"snap/", "wal/"}, WALUnknown},
|
||||
|
@ -323,23 +323,23 @@ func TestRecoverAfterCut(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
w, err := Create(p, []byte("metadata"))
|
||||
md, err := Create(p, []byte("metadata"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
|
||||
if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
es := []raftpb.Entry{{Index: uint64(i)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
if err = md.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Cut(); err != nil {
|
||||
if err = md.Cut(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
w.Close()
|
||||
md.Close()
|
||||
|
||||
if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
|
||||
t.Fatal(err)
|
||||
|
Reference in New Issue
Block a user