merge master
This commit is contained in:
@ -4,9 +4,9 @@ Goal: We want to be able to upgrade an individual peer in an etcd cluster to a n
|
|||||||
The process will take the form of individual followers upgrading to the latest version until the entire cluster is on the new version.
|
The process will take the form of individual followers upgrading to the latest version until the entire cluster is on the new version.
|
||||||
|
|
||||||
Immediate need: etcd is moving too fast to version the internal API right now.
|
Immediate need: etcd is moving too fast to version the internal API right now.
|
||||||
But, we need to keep mixed version clusters from being started by a rollowing upgrade process (e.g. the CoreOS developer alpha).
|
But, we need to keep mixed version clusters from being started by a rolling upgrade process (e.g. the CoreOS developer alpha).
|
||||||
|
|
||||||
Longer term need: Having a mixed version cluster where all peers are not be running the exact same version of etcd itself but are able to speak one version of the internal protocol.
|
Longer term need: Having a mixed version cluster where all peers are not running the exact same version of etcd itself but are able to speak one version of the internal protocol.
|
||||||
|
|
||||||
Solution: The internal protocol needs to be versioned just as the client protocol is.
|
Solution: The internal protocol needs to be versioned just as the client protocol is.
|
||||||
Initially during the 0.\*.\* series of etcd releases we won't allow mixed versions at all.
|
Initially during the 0.\*.\* series of etcd releases we won't allow mixed versions at all.
|
||||||
|
80
README.md
80
README.md
@ -56,18 +56,18 @@ go version
|
|||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
### Running a single node
|
### Running a single machine
|
||||||
|
|
||||||
These examples will use a single node cluster to show you the basics of the etcd REST API.
|
These examples will use a single machine cluster to show you the basics of the etcd REST API.
|
||||||
Let's start etcd:
|
Let's start etcd:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
./etcd -data-dir node0 -name node0
|
./etcd -data-dir machine0 -name machine0
|
||||||
```
|
```
|
||||||
|
|
||||||
This will bring up an etcd node listening on port 4001 for client communication and on port 7001 for server-to-server communication.
|
This will bring up etcd listening on port 4001 for client communication and on port 7001 for server-to-server communication.
|
||||||
The `-data-dir node0` argument tells etcd to write node configuration, logs and snapshots to the `./node0/` directory.
|
The `-data-dir machine0` argument tells etcd to write machine configuration, logs and snapshots to the `./machine0/` directory.
|
||||||
The `-name node0` tells the rest of the cluster that this node is named node0.
|
The `-name machine` tells the rest of the cluster that this machine is named machine0.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ The `-name node0` tells the rest of the cluster that this node is named node0.
|
|||||||
|
|
||||||
### Setting the value to a key
|
### Setting the value to a key
|
||||||
|
|
||||||
Let’s set the first key-value pair to the node.
|
Let’s set the first key-value pair to the datastore.
|
||||||
In this case the key is `/message` and the value is `Hello world`.
|
In this case the key is `/message` and the value is `Hello world`.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
@ -121,7 +121,7 @@ curl -L http://127.0.0.1:4001/v2/keys/message
|
|||||||
You can change the value of `/message` from `Hello world` to `Hello etcd` with another `PUT` request to the key:
|
You can change the value of `/message` from `Hello world` to `Hello etcd` with another `PUT` request to the key:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4001/v1/keys/message -XPUT -d value="Hello etcd"
|
curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello etcd"
|
||||||
```
|
```
|
||||||
|
|
||||||
```json
|
```json
|
||||||
@ -164,7 +164,7 @@ Note the two new fields in response:
|
|||||||
|
|
||||||
2. The `ttl` is the time to live for the key, in seconds.
|
2. The `ttl` is the time to live for the key, in seconds.
|
||||||
|
|
||||||
_NOTE_: Keys can only be expired by a cluster leader so if a node gets disconnected from the cluster, its keys will not expire until it rejoins.
|
_NOTE_: Keys can only be expired by a cluster leader so if a machine gets disconnected from the cluster, its keys will not expire until it rejoins.
|
||||||
|
|
||||||
Now you can try to get the key by sending a `GET` request:
|
Now you can try to get the key by sending a `GET` request:
|
||||||
|
|
||||||
@ -235,14 +235,26 @@ Here is a simple example.
|
|||||||
Let's create a key-value pair first: `foo=one`.
|
Let's create a key-value pair first: `foo=one`.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4001/v1/keys/foo -XPUT -d value=one
|
curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=one
|
||||||
```
|
```
|
||||||
|
|
||||||
Let's try an invalid `CompareAndSwap` command first.
|
Let's try some invalid `CompareAndSwap` commands first.
|
||||||
We can provide the `prevValue` parameter to the set command to make it a `CompareAndSwap` command.
|
|
||||||
|
Trying to set this existing key with `prevExist=false` fails as expected:
|
||||||
|
```sh
|
||||||
|
curl -L http://127.0.0.1:4001/v2/keys/foo?prevExist=false -XPUT -d value=three
|
||||||
|
```
|
||||||
|
|
||||||
|
The error code explains the problem:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"errorCode":105,"message":"Already exists","cause":"/foo","index":39776}
|
||||||
|
```
|
||||||
|
|
||||||
|
Now lets provide a `prevValue` parameter:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4001/v1/keys/foo?prevValue=two -XPUT -d value=three
|
curl -L http://127.0.0.1:4001/v2/keys/foo?prevValue=two -XPUT -d value=three
|
||||||
```
|
```
|
||||||
|
|
||||||
This will try to compare the previous value of the key and the previous value we provided. If they are equal, the value of the key will change to three.
|
This will try to compare the previous value of the key and the previous value we provided. If they are equal, the value of the key will change to three.
|
||||||
@ -378,12 +390,12 @@ For testing you can use the certificates in the `fixtures/ca` directory.
|
|||||||
Let's configure etcd to use this keypair:
|
Let's configure etcd to use this keypair:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
./etcd -f -name node0 -data-dir node0 -cert-file=./fixtures/ca/server.crt -key-file=./fixtures/ca/server.key.insecure
|
./etcd -f -name machine0 -data-dir machine0 -cert-file=./fixtures/ca/server.crt -key-file=./fixtures/ca/server.key.insecure
|
||||||
```
|
```
|
||||||
|
|
||||||
There are a few new options we're using:
|
There are a few new options we're using:
|
||||||
|
|
||||||
* `-f` - forces a new node configuration, even if an existing configuration is found. (WARNING: data loss!)
|
* `-f` - forces a new machine configuration, even if an existing configuration is found. (WARNING: data loss!)
|
||||||
* `-cert-file` and `-key-file` specify the location of the cert and key files to be used for for transport layer security between the client and server.
|
* `-cert-file` and `-key-file` specify the location of the cert and key files to be used for for transport layer security between the client and server.
|
||||||
|
|
||||||
You can now test the configuration using HTTPS:
|
You can now test the configuration using HTTPS:
|
||||||
@ -413,7 +425,7 @@ We can also do authentication using CA certs.
|
|||||||
The clients will provide their cert to the server and the server will check whether the cert is signed by the CA and decide whether to serve the request.
|
The clients will provide their cert to the server and the server will check whether the cert is signed by the CA and decide whether to serve the request.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
./etcd -f -name node0 -data-dir node0 -ca-file=./fixtures/ca/ca.crt -cert-file=./fixtures/ca/server.crt -key-file=./fixtures/ca/server.key.insecure
|
./etcd -f -name machine0 -data-dir machine0 -ca-file=./fixtures/ca/ca.crt -cert-file=./fixtures/ca/server.crt -key-file=./fixtures/ca/server.key.insecure
|
||||||
```
|
```
|
||||||
|
|
||||||
```-ca-file``` is the path to the CA cert.
|
```-ca-file``` is the path to the CA cert.
|
||||||
@ -435,7 +447,7 @@ routines:SSL3_READ_BYTES:sslv3 alert bad certificate
|
|||||||
We need to give the CA signed cert to the server.
|
We need to give the CA signed cert to the server.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl --key ./fixtures/ca/server2.key.insecure --cert ./fixtures/ca/server2.crt --cacert ./fixtures/ca/server-chain.pem -L https://127.0.0.1:4001/v1/keys/foo -XPUT -d value=bar -v
|
curl --key ./fixtures/ca/server2.key.insecure --cert ./fixtures/ca/server2.crt --cacert ./fixtures/ca/server-chain.pem -L https://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar -v
|
||||||
```
|
```
|
||||||
|
|
||||||
You should able to see:
|
You should able to see:
|
||||||
@ -463,29 +475,29 @@ We use Raft as the underlying distributed protocol which provides consistency an
|
|||||||
|
|
||||||
Let start by creating 3 new etcd instances.
|
Let start by creating 3 new etcd instances.
|
||||||
|
|
||||||
We use `-peer-addr` to specify server port and `-addr` to specify client port and `-data-dir` to specify the directory to store the log and info of the node in the cluster:
|
We use `-peer-addr` to specify server port and `-addr` to specify client port and `-data-dir` to specify the directory to store the log and info of the machine in the cluster:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
./etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir nodes/node1 -name node1
|
./etcd -peer-addr 127.0.0.1:7001 -addr 127.0.0.1:4001 -data-dir machines/machine1 -name machine1
|
||||||
```
|
```
|
||||||
|
|
||||||
**Note:** If you want to run etcd on an external IP address and still have access locally, you'll need to add `-bind-addr 0.0.0.0` so that it will listen on both external and localhost addresses.
|
**Note:** If you want to run etcd on an external IP address and still have access locally, you'll need to add `-bind-addr 0.0.0.0` so that it will listen on both external and localhost addresses.
|
||||||
A similar argument `-peer-bind-addr` is used to setup the listening address for the server port.
|
A similar argument `-peer-bind-addr` is used to setup the listening address for the server port.
|
||||||
|
|
||||||
Let's join two more nodes to this cluster using the `-peers` argument:
|
Let's join two more machines to this cluster using the `-peers` argument:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
./etcd -peer-addr 127.0.0.1:7002 -addr 127.0.0.1:4002 -peers 127.0.0.1:7001 -data-dir nodes/node2 -name node2
|
./etcd -peer-addr 127.0.0.1:7002 -addr 127.0.0.1:4002 -peers 127.0.0.1:7001 -data-dir machines/machine2 -name machine2
|
||||||
./etcd -peer-addr 127.0.0.1:7003 -addr 127.0.0.1:4003 -peers 127.0.0.1:7001 -data-dir nodes/node3 -name node3
|
./etcd -peer-addr 127.0.0.1:7003 -addr 127.0.0.1:4003 -peers 127.0.0.1:7001 -data-dir machines/machine3 -name machine3
|
||||||
```
|
```
|
||||||
|
|
||||||
We can retrieve a list of machines in the cluster using the HTTP API:
|
We can retrieve a list of machines in the cluster using the HTTP API:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4001/v1/machines
|
curl -L http://127.0.0.1:4001/v2/machines
|
||||||
```
|
```
|
||||||
|
|
||||||
We should see there are three nodes in the cluster
|
We should see there are three machines in the cluster
|
||||||
|
|
||||||
```
|
```
|
||||||
http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003
|
http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003
|
||||||
@ -494,11 +506,11 @@ http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003
|
|||||||
The machine list is also available via the main key API:
|
The machine list is also available via the main key API:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
|
curl -L http://127.0.0.1:4001/v2/keys/_etcd/machines
|
||||||
```
|
```
|
||||||
|
|
||||||
```json
|
```json
|
||||||
[{"action":"get","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001\u0026etcd=http://127.0.0.1:4001","index":1},{"action":"get","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002\u0026etcd=http://127.0.0.1:4002","index":1},{"action":"get","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003\u0026etcd=http://127.0.0.1:4003","index":1}]
|
[{"action":"get","key":"/_etcd/machines/machine1","value":"raft=http://127.0.0.1:7001\u0026etcd=http://127.0.0.1:4001","index":1},{"action":"get","key":"/_etcd/machines/machine2","value":"raft=http://127.0.0.1:7002\u0026etcd=http://127.0.0.1:4002","index":1},{"action":"get","key":"/_etcd/machines/machine3","value":"raft=http://127.0.0.1:7003\u0026etcd=http://127.0.0.1:4003","index":1}]
|
||||||
```
|
```
|
||||||
|
|
||||||
We can also get the current leader in the cluster:
|
We can also get the current leader in the cluster:
|
||||||
@ -529,13 +541,13 @@ curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar
|
|||||||
Now if we kill the leader of the cluster, we can get the value from one of the other two machines:
|
Now if we kill the leader of the cluster, we can get the value from one of the other two machines:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4002/v1/keys/foo
|
curl -L http://127.0.0.1:4002/v2/keys/foo
|
||||||
```
|
```
|
||||||
|
|
||||||
We can also see that a new leader has been elected:
|
We can also see that a new leader has been elected:
|
||||||
|
|
||||||
```
|
```
|
||||||
curl -L http://127.0.0.1:4002/v1/leader
|
curl -L http://127.0.0.1:4002/v2/leader
|
||||||
```
|
```
|
||||||
|
|
||||||
```
|
```
|
||||||
@ -551,13 +563,13 @@ http://127.0.0.1:7003
|
|||||||
|
|
||||||
### Testing Persistence
|
### Testing Persistence
|
||||||
|
|
||||||
Next we'll kill all the nodes to test persistence.
|
Next we'll kill all the machines to test persistence.
|
||||||
Type `CTRL-C` on each terminal and then rerun the same command you used to start each node.
|
Type `CTRL-C` on each terminal and then rerun the same command you used to start each machine.
|
||||||
|
|
||||||
Your request for the `foo` key will return the correct value:
|
Your request for the `foo` key will return the correct value:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
curl -L http://127.0.0.1:4002/v1/keys/foo
|
curl -L http://127.0.0.1:4002/v2/keys/foo
|
||||||
```
|
```
|
||||||
|
|
||||||
```json
|
```json
|
||||||
@ -654,8 +666,8 @@ The command is not committed until the majority of the cluster peers receive tha
|
|||||||
Because of this majority voting property, the ideal cluster should be kept small to keep speed up and be made up of an odd number of peers.
|
Because of this majority voting property, the ideal cluster should be kept small to keep speed up and be made up of an odd number of peers.
|
||||||
|
|
||||||
Odd numbers are good because if you have 8 peers the majority will be 5 and if you have 9 peers the majority will still be 5.
|
Odd numbers are good because if you have 8 peers the majority will be 5 and if you have 9 peers the majority will still be 5.
|
||||||
The result is that an 8 peer cluster can tolerate 3 peer failures and a 9 peer cluster can tolerate 4 nodes failures.
|
The result is that an 8 peer cluster can tolerate 3 peer failures and a 9 peer cluster can tolerate 4 machine failures.
|
||||||
And in the best case when all 9 peers are responding the cluster will perform at the speed of the fastest 5 nodes.
|
And in the best case when all 9 peers are responding the cluster will perform at the speed of the fastest 5 machines.
|
||||||
|
|
||||||
|
|
||||||
### Why SSLv3 alert handshake failure when using SSL client auth?
|
### Why SSLv3 alert handshake failure when using SSL client auth?
|
||||||
@ -677,7 +689,7 @@ Add the following section to your openssl.cnf:
|
|||||||
When creating the cert be sure to reference it in the `-extensions` flag:
|
When creating the cert be sure to reference it in the `-extensions` flag:
|
||||||
|
|
||||||
```
|
```
|
||||||
openssl ca -config openssl.cnf -policy policy_anything -extensions ssl_client -out certs/node.crt -infiles node.csr
|
openssl ca -config openssl.cnf -policy policy_anything -extensions ssl_client -out certs/machine.crt -infiles machine.csr
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ const (
|
|||||||
EcodeNotDir = 104
|
EcodeNotDir = 104
|
||||||
EcodeNodeExist = 105
|
EcodeNodeExist = 105
|
||||||
EcodeKeyIsPreserved = 106
|
EcodeKeyIsPreserved = 106
|
||||||
|
EcodeRootROnly = 107
|
||||||
|
|
||||||
EcodeValueRequired = 200
|
EcodeValueRequired = 200
|
||||||
EcodePrevValueRequired = 201
|
EcodePrevValueRequired = 201
|
||||||
@ -56,6 +57,7 @@ func init() {
|
|||||||
errors[EcodeNoMorePeer] = "Reached the max number of peers in the cluster"
|
errors[EcodeNoMorePeer] = "Reached the max number of peers in the cluster"
|
||||||
errors[EcodeNotDir] = "Not A Directory"
|
errors[EcodeNotDir] = "Not A Directory"
|
||||||
errors[EcodeNodeExist] = "Already exists" // create
|
errors[EcodeNodeExist] = "Already exists" // create
|
||||||
|
errors[EcodeRootROnly] = "Root is read only"
|
||||||
errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd"
|
errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd"
|
||||||
|
|
||||||
// Post form related errors
|
// Post form related errors
|
||||||
|
@ -96,6 +96,11 @@ func (s *Server) PeerURL(name string) (string, bool) {
|
|||||||
return s.registry.PeerURL(name)
|
return s.registry.PeerURL(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClientURL retrieves the Client URL for a given node name.
|
||||||
|
func (s *Server) ClientURL(name string) (string, bool) {
|
||||||
|
return s.registry.ClientURL(name)
|
||||||
|
}
|
||||||
|
|
||||||
// Returns a reference to the Store.
|
// Returns a reference to the Store.
|
||||||
func (s *Server) Store() store.Store {
|
func (s *Server) Store() store.Store {
|
||||||
return s.store
|
return s.store
|
||||||
|
@ -26,8 +26,9 @@ Options:
|
|||||||
-vv Enabled very verbose logging.
|
-vv Enabled very verbose logging.
|
||||||
|
|
||||||
Cluster Configuration Options:
|
Cluster Configuration Options:
|
||||||
-peers=<peers> Comma-separated list of peers (ip + port) in the cluster.
|
|
||||||
-peers-file=<path> Path to a file containing the peer list.
|
-peers-file=<path> Path to a file containing the peer list.
|
||||||
|
-peers=<host:port>,<host:port> Comma-separated list of peers. The members
|
||||||
|
should match the peer's '-peer-addr' flag.
|
||||||
|
|
||||||
Client Communication Options:
|
Client Communication Options:
|
||||||
-addr=<host:port> The public host:port used for client communication.
|
-addr=<host:port> The public host:port used for client communication.
|
||||||
|
@ -23,7 +23,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
|||||||
// Help client to redirect the request to the current leader
|
// Help client to redirect the request to the current leader
|
||||||
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
|
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
|
||||||
leader := s.Leader()
|
leader := s.Leader()
|
||||||
hostname, _ := s.PeerURL(leader)
|
hostname, _ := s.ClientURL(leader)
|
||||||
url := hostname + req.URL.Path
|
url := hostname + req.URL.Path
|
||||||
log.Debugf("Redirect consistent get to %s", url)
|
log.Debugf("Redirect consistent get to %s", url)
|
||||||
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
|
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
|
||||||
|
@ -13,6 +13,7 @@ type Server interface {
|
|||||||
CommitIndex() uint64
|
CommitIndex() uint64
|
||||||
Term() uint64
|
Term() uint64
|
||||||
PeerURL(string) (string, bool)
|
PeerURL(string) (string, bool)
|
||||||
|
ClientURL(string) (string, bool)
|
||||||
Store() store.Store
|
Store() store.Store
|
||||||
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
|
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -39,8 +40,8 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// scan function is enumerating events from the index in history and
|
// scan function is enumerating events from the index in history and
|
||||||
// stops till the first point where the key has identified prefix
|
// stops till the first point where the key has identified key
|
||||||
func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
|
func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
|
||||||
eh.rwl.RLock()
|
eh.rwl.RLock()
|
||||||
defer eh.rwl.RUnlock()
|
defer eh.rwl.RUnlock()
|
||||||
|
|
||||||
@ -62,7 +63,19 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro
|
|||||||
for {
|
for {
|
||||||
e := eh.Queue.Events[i]
|
e := eh.Queue.Events[i]
|
||||||
|
|
||||||
if strings.HasPrefix(e.Node.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one
|
ok := (e.Node.Key == key)
|
||||||
|
|
||||||
|
if recursive {
|
||||||
|
// add tailing slash
|
||||||
|
key := path.Clean(key)
|
||||||
|
if key[len(key)-1] != '/' {
|
||||||
|
key = key + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
ok = ok || strings.HasPrefix(e.Node.Key, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok && index <= e.Index() { // make sure we bypass the smaller one
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,24 +41,24 @@ func TestScanHistory(t *testing.T) {
|
|||||||
eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 4))
|
eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 4))
|
||||||
eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 5))
|
eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 5))
|
||||||
|
|
||||||
e, err := eh.scan("/foo", 1)
|
e, err := eh.scan("/foo", false, 1)
|
||||||
if err != nil || e.Index() != 1 {
|
if err != nil || e.Index() != 1 {
|
||||||
t.Fatalf("scan error [/foo] [1] %v", e.Index)
|
t.Fatalf("scan error [/foo] [1] %v", e.Index)
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err = eh.scan("/foo/bar", 1)
|
e, err = eh.scan("/foo/bar", false, 1)
|
||||||
|
|
||||||
if err != nil || e.Index() != 2 {
|
if err != nil || e.Index() != 2 {
|
||||||
t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
|
t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err = eh.scan("/foo/bar", 3)
|
e, err = eh.scan("/foo/bar", true, 3)
|
||||||
|
|
||||||
if err != nil || e.Index() != 4 {
|
if err != nil || e.Index() != 4 {
|
||||||
t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
|
t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err = eh.scan("/foo/bar", 6)
|
e, err = eh.scan("/foo/bar", true, 6)
|
||||||
|
|
||||||
if e != nil {
|
if e != nil {
|
||||||
t.Fatalf("bad index shoud reuturn nil")
|
t.Fatalf("bad index shoud reuturn nil")
|
||||||
|
@ -157,8 +157,6 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
|||||||
// If the node has already existed, create will fail.
|
// If the node has already existed, create will fail.
|
||||||
// If any node on the path is a file, create will fail.
|
// If any node on the path is a file, create will fail.
|
||||||
func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
|
func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
|
e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
|
||||||
@ -174,8 +172,6 @@ func (s *store) Create(nodePath string, value string, unique bool, expireTime ti
|
|||||||
|
|
||||||
// Set function creates or replace the node at nodePath.
|
// Set function creates or replace the node at nodePath.
|
||||||
func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
|
func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
|
e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
|
||||||
@ -193,6 +189,10 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|||||||
value string, expireTime time.Time) (*Event, error) {
|
value string, expireTime time.Time) (*Event, error) {
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
// we do not allow the user to change "/"
|
||||||
|
if nodePath == "/" {
|
||||||
|
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
|
||||||
|
}
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
@ -241,6 +241,10 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|||||||
// If the node is a directory, recursive must be true to delete it.
|
// If the node is a directory, recursive must be true to delete it.
|
||||||
func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
|
func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
// we do not allow the user to change "/"
|
||||||
|
if nodePath == "/" {
|
||||||
|
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
|
||||||
|
}
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
@ -284,8 +288,8 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
|
|||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
|
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
|
||||||
prefix = path.Clean(path.Join("/", prefix))
|
key = path.Clean(path.Join("/", key))
|
||||||
|
|
||||||
nextIndex := s.CurrentIndex + 1
|
nextIndex := s.CurrentIndex + 1
|
||||||
|
|
||||||
@ -296,10 +300,10 @@ func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan
|
|||||||
var err *etcdErr.Error
|
var err *etcdErr.Error
|
||||||
|
|
||||||
if sinceIndex == 0 {
|
if sinceIndex == 0 {
|
||||||
c, err = s.WatcherHub.watch(prefix, recursive, nextIndex)
|
c, err = s.WatcherHub.watch(key, recursive, nextIndex)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
|
c, err = s.WatcherHub.watch(key, recursive, sinceIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -338,13 +342,17 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
|
|||||||
// If the node is a file, the value and the ttl can be updated.
|
// If the node is a file, the value and the ttl can be updated.
|
||||||
// If the node is a directory, only the ttl can be updated.
|
// If the node is a directory, only the ttl can be updated.
|
||||||
func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
|
func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
|
||||||
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
// we do not allow the user to change "/"
|
||||||
|
if nodePath == "/" {
|
||||||
|
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
|
||||||
|
}
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
n, err := s.internalGet(nodePath)
|
||||||
|
|
||||||
if err != nil { // if the node does not exist, return error
|
if err != nil { // if the node does not exist, return error
|
||||||
@ -396,13 +404,18 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
|
|||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
|
||||||
|
// we do not allow the user to change "/"
|
||||||
|
if nodePath == "/" {
|
||||||
|
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
|
||||||
|
}
|
||||||
|
|
||||||
// Assume expire times that are way in the past are not valid.
|
// Assume expire times that are way in the past are not valid.
|
||||||
// This can occur when the time is serialized to JSON and read back in.
|
// This can occur when the time is serialized to JSON and read back in.
|
||||||
if expireTime.Before(minExpireTime) {
|
if expireTime.Before(minExpireTime) {
|
||||||
expireTime = Permanent
|
expireTime = Permanent
|
||||||
}
|
}
|
||||||
|
|
||||||
dir, newnodeName := path.Split(nodePath)
|
dir, newNodeName := path.Split(nodePath)
|
||||||
|
|
||||||
// walk through the nodePath, create dirs and get the last directory node
|
// walk through the nodePath, create dirs and get the last directory node
|
||||||
d, err := s.walk(dir, s.checkDir)
|
d, err := s.walk(dir, s.checkDir)
|
||||||
@ -416,7 +429,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
|
|||||||
e := newEvent(action, nodePath, nextIndex, nextIndex)
|
e := newEvent(action, nodePath, nextIndex, nextIndex)
|
||||||
eNode := e.Node
|
eNode := e.Node
|
||||||
|
|
||||||
n, _ := d.GetChild(newnodeName)
|
n, _ := d.GetChild(newNodeName)
|
||||||
|
|
||||||
// force will try to replace a existing file
|
// force will try to replace a existing file
|
||||||
if n != nil {
|
if n != nil {
|
||||||
|
@ -213,6 +213,26 @@ func TestStoreDeleteDiretory(t *testing.T) {
|
|||||||
assert.Equal(t, e.Action, "delete", "")
|
assert.Equal(t, e.Action, "delete", "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRootRdOnly(t *testing.T) {
|
||||||
|
s := newStore()
|
||||||
|
|
||||||
|
_, err := s.Set("/", "", Permanent)
|
||||||
|
assert.NotNil(t, err, "")
|
||||||
|
|
||||||
|
_, err = s.Delete("/", true)
|
||||||
|
assert.NotNil(t, err, "")
|
||||||
|
|
||||||
|
_, err = s.Create("/", "", false, Permanent)
|
||||||
|
assert.NotNil(t, err, "")
|
||||||
|
|
||||||
|
_, err = s.Update("/", "", Permanent)
|
||||||
|
assert.NotNil(t, err, "")
|
||||||
|
|
||||||
|
_, err = s.CompareAndSwap("/", "", 0, "", Permanent)
|
||||||
|
assert.NotNil(t, err, "")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the store cannot delete a directory if recursive is not specified.
|
// Ensure that the store cannot delete a directory if recursive is not specified.
|
||||||
func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
|
func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
|
||||||
s := newStore()
|
s := newStore()
|
||||||
|
@ -33,11 +33,11 @@ func newWatchHub(capacity int) *watcherHub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// watch function returns an Event channel.
|
// watch function returns an Event channel.
|
||||||
// If recursive is true, the first change after index under prefix will be sent to the event channel.
|
// If recursive is true, the first change after index under key will be sent to the event channel.
|
||||||
// If recursive is false, the first change after index at prefix will be sent to the event channel.
|
// If recursive is false, the first change after index at key will be sent to the event channel.
|
||||||
// If index is zero, watch will start from the current index + 1.
|
// If index is zero, watch will start from the current index + 1.
|
||||||
func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
|
func (wh *watcherHub) watch(key string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
|
||||||
event, err := wh.EventHistory.scan(prefix, index)
|
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -57,7 +57,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
|
|||||||
sinceIndex: index,
|
sinceIndex: index,
|
||||||
}
|
}
|
||||||
|
|
||||||
l, ok := wh.watchers[prefix]
|
l, ok := wh.watchers[key]
|
||||||
|
|
||||||
if ok { // add the new watcher to the back of the list
|
if ok { // add the new watcher to the back of the list
|
||||||
l.PushBack(w)
|
l.PushBack(w)
|
||||||
@ -65,7 +65,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
|
|||||||
} else { // create a new list and add the new watcher
|
} else { // create a new list and add the new watcher
|
||||||
l := list.New()
|
l := list.New()
|
||||||
l.PushBack(w)
|
l.PushBack(w)
|
||||||
wh.watchers[prefix] = l
|
wh.watchers[key] = l
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt64(&wh.count, 1)
|
atomic.AddInt64(&wh.count, 1)
|
||||||
|
@ -68,4 +68,24 @@ func TestWatcher(t *testing.T) {
|
|||||||
t.Fatal("recv != send")
|
t.Fatal("recv != send")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure we are doing exact matching rather than prefix matching
|
||||||
|
c, _ = wh.watch("/fo", true, 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case re = <-c:
|
||||||
|
t.Fatal("should not receive from channel:", re)
|
||||||
|
default:
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
e = newEvent(Create, "/fo/bar", 3, 3)
|
||||||
|
|
||||||
|
wh.notify(e)
|
||||||
|
|
||||||
|
re = <-c
|
||||||
|
|
||||||
|
if e != re {
|
||||||
|
t.Fatal("recv != send")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -45,6 +45,11 @@ func (s *ServerV2) PeerURL(name string) (string, bool) {
|
|||||||
return args.String(0), args.Bool(1)
|
return args.String(0), args.Bool(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ServerV2) ClientURL(name string) (string, bool) {
|
||||||
|
args := s.Called(name)
|
||||||
|
return args.String(0), args.Bool(1)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ServerV2) Store() store.Store {
|
func (s *ServerV2) Store() store.Store {
|
||||||
return s.store
|
return s.store
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user