Compare commits
17 Commits
v0.2.0-rc3
...
v0.2.0-rc4
Author | SHA1 | Date | |
---|---|---|---|
f026d1c14e | |||
e30bf19684 | |||
d4553714d9 | |||
e1d909eb0e | |||
317b34f4a0 | |||
0937b4d266 | |||
ef988020b7 | |||
70c8c09360 | |||
d89fa131ab | |||
3f85829e87 | |||
3cde996d21 | |||
39fb266776 | |||
557ffbb861 | |||
ddcf3975ed | |||
cc88215b46 | |||
75c02ed0da | |||
c7536ff5e1 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,6 +1,7 @@
|
||||
src/
|
||||
pkg/
|
||||
/etcd
|
||||
/etcdbench
|
||||
/server/release_version.go
|
||||
/go-bindata
|
||||
/machine*
|
||||
|
47
README.md
47
README.md
@ -32,7 +32,7 @@ Or feel free to just use curl, as in the examples below.
|
||||
|
||||
### Getting etcd
|
||||
|
||||
The latest release is available as a binary at [Github][github-release].
|
||||
The latest release and setup instructions are available at [Github][github-release].
|
||||
|
||||
[github-release]: https://github.com/coreos/etcd/releases/
|
||||
|
||||
@ -1169,7 +1169,50 @@ openssl ca -config openssl.cnf -policy policy_anything -extensions ssl_client -o
|
||||
|
||||
### Tuning
|
||||
|
||||
TODO
|
||||
The default settings in etcd should work well for installations on a local network where the average network latency is low.
|
||||
However, when using etcd across multiple data centers or over networks with high latency you may need to tweak the heartbeat and election timeout settings.
|
||||
|
||||
The underlying distributed consensus protocol relies on two separate timeouts to ensure that nodes can handoff leadership if one stalls or goes offline.
|
||||
The first timeout is called the *Heartbeat Timeout*.
|
||||
This is the frequency with which the leader will notify followers that it is still the leader.
|
||||
etcd batches commands together for higher throughput so this heartbeat timeout is also a delay for how long it takes for commands to be committed.
|
||||
By default, etcd uses a `50ms` heartbeat timeout.
|
||||
|
||||
The second timeout is the *Election Timeout*.
|
||||
This timeout is how long a follower node will go without hearing a heartbeat before attempting to become leader itself.
|
||||
By default, etcd uses a `200ms` election timeout.
|
||||
|
||||
Adjusting these values is a trade off.
|
||||
Lowering the heartbeat timeout will cause individual commands to be committed faster but it will lower the overall throughput of etcd.
|
||||
If your etcd instances have low utilization then lowering the heartbeat timeout can improve your command response time.
|
||||
|
||||
The election timeout should be set based on the heartbeat timeout and your network ping time between nodes.
|
||||
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
|
||||
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
|
||||
|
||||
You should also set your election timeout to at least 4 to 5 times your heartbeat timeout to account for variance in leader replication.
|
||||
For a heartbeat timeout of 50ms you should set your election timeout to at least 200ms - 250ms.
|
||||
|
||||
You can override the default values on the command line:
|
||||
|
||||
```sh
|
||||
# Command line arguments:
|
||||
$ etcd -peer-heartbeat-timeout=100 -peer-election-timeout=500
|
||||
|
||||
# Environment variables:
|
||||
$ ETCD_PEER_HEARTBEAT_TIMEOUT=100 ETCD_PEER_ELECTION_TIMEOUT=500 etcd
|
||||
```
|
||||
|
||||
Or you can set the values within the configuration file:
|
||||
|
||||
```toml
|
||||
[peer]
|
||||
heartbeat_timeout = 100
|
||||
election_timeout = 100
|
||||
```
|
||||
|
||||
The values are specified in milliseconds.
|
||||
|
||||
|
||||
## Project Details
|
||||
|
||||
|
58
bench/bench.go
Normal file
58
bench/bench.go
Normal file
@ -0,0 +1,58 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func write(requests int, end chan int) {
|
||||
client := etcd.NewClient(nil)
|
||||
|
||||
for i := 0; i < requests; i++ {
|
||||
key := strconv.Itoa(i)
|
||||
client.Set(key, key, 0)
|
||||
}
|
||||
end <- 1
|
||||
}
|
||||
|
||||
func watch(key string) {
|
||||
client := etcd.NewClient(nil)
|
||||
|
||||
receiver := make(chan *etcd.Response)
|
||||
go client.Watch(key, 0, true, receiver, nil)
|
||||
|
||||
log.Printf("watching: %s", key)
|
||||
|
||||
received := 0
|
||||
for {
|
||||
<-receiver
|
||||
received++
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
rWrites := flag.Int("write-requests", 50000, "number of writes")
|
||||
cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
|
||||
|
||||
watches := flag.Int("watches", 500, "number of writes")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
for i := 0; i < *watches; i++ {
|
||||
key := strconv.Itoa(i)
|
||||
go watch(key)
|
||||
}
|
||||
|
||||
wChan := make(chan int, *cWrites)
|
||||
for i := 0; i < *cWrites; i++ {
|
||||
go write((*rWrites / *cWrites), wChan)
|
||||
}
|
||||
|
||||
for i := 0; i < *cWrites; i++ {
|
||||
<-wChan
|
||||
log.Printf("Completed %d writes", (*rWrites / *cWrites))
|
||||
}
|
||||
}
|
1
build
1
build
@ -24,3 +24,4 @@ done
|
||||
|
||||
./scripts/release-version > server/release_version.go
|
||||
go build "${ETCD_PACKAGE}"
|
||||
go build -o etcdbench "${ETCD_PACKAGE}"/bench
|
||||
|
@ -111,10 +111,19 @@ func (e Error) toJsonString() string {
|
||||
|
||||
func (e Error) Write(w http.ResponseWriter) {
|
||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
|
||||
// 3xx is reft internal error
|
||||
if e.ErrorCode/100 == 3 {
|
||||
http.Error(w, e.toJsonString(), http.StatusInternalServerError)
|
||||
} else {
|
||||
http.Error(w, e.toJsonString(), http.StatusBadRequest)
|
||||
// 3xx is raft internal error
|
||||
status := http.StatusBadRequest
|
||||
switch e.ErrorCode {
|
||||
case EcodeKeyNotFound:
|
||||
status = http.StatusNotFound
|
||||
case EcodeNotFile, EcodeDirNotEmpty:
|
||||
status = http.StatusForbidden
|
||||
case EcodeTestFailed, EcodeNodeExist:
|
||||
status = http.StatusPreconditionFailed
|
||||
default:
|
||||
if e.ErrorCode/100 == 3 {
|
||||
status = http.StatusInternalServerError
|
||||
}
|
||||
}
|
||||
http.Error(w, e.toJsonString(), status)
|
||||
}
|
||||
|
8
etcd.go
8
etcd.go
@ -86,11 +86,11 @@ func main() {
|
||||
ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
|
||||
ps.MaxClusterSize = config.MaxClusterSize
|
||||
ps.RetryTimes = config.MaxRetryAttempts
|
||||
if config.HeartbeatTimeout > 0 {
|
||||
ps.HeartbeatTimeout = time.Duration(config.HeartbeatTimeout) * time.Millisecond
|
||||
if config.Peer.HeartbeatTimeout > 0 {
|
||||
ps.HeartbeatTimeout = time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
|
||||
}
|
||||
if config.ElectionTimeout > 0 {
|
||||
ps.ElectionTimeout = time.Duration(config.ElectionTimeout) * time.Millisecond
|
||||
if config.Peer.ElectionTimeout > 0 {
|
||||
ps.ElectionTimeout = time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
|
||||
}
|
||||
|
||||
// Create client server.
|
||||
|
@ -67,14 +67,14 @@ type Config struct {
|
||||
ShowVersion bool
|
||||
Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"`
|
||||
VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
|
||||
HeartbeatTimeout int `toml:"peer_heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
|
||||
ElectionTimeout int `toml:"peer_election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
|
||||
Peer struct {
|
||||
Addr string `toml:"addr" env:"ETCD_PEER_ADDR"`
|
||||
BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
|
||||
CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"`
|
||||
CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"`
|
||||
KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"`
|
||||
HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
|
||||
ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,10 +86,10 @@ func NewConfig() *Config {
|
||||
c.MaxClusterSize = 9
|
||||
c.MaxResultBuffer = 1024
|
||||
c.MaxRetryAttempts = 3
|
||||
c.Peer.Addr = "127.0.0.1:7001"
|
||||
c.SnapshotCount = 10000
|
||||
c.ElectionTimeout = 0
|
||||
c.HeartbeatTimeout = 0
|
||||
c.Peer.Addr = "127.0.0.1:7001"
|
||||
c.Peer.HeartbeatTimeout = 0
|
||||
c.Peer.ElectionTimeout = 0
|
||||
return c
|
||||
}
|
||||
|
||||
@ -236,8 +236,8 @@ func (c *Config) LoadFlags(arguments []string) error {
|
||||
f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
|
||||
f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
|
||||
f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
|
||||
f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", c.HeartbeatTimeout, "")
|
||||
f.IntVar(&c.ElectionTimeout, "peer-election-timeout", c.ElectionTimeout, "")
|
||||
f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "")
|
||||
f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "")
|
||||
|
||||
f.StringVar(&cors, "cors", "", "")
|
||||
|
||||
|
@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
// Start the watcher on the store.
|
||||
eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
|
||||
if err != nil {
|
||||
return etcdErr.NewError(500, key, s.Store().Index())
|
||||
return err
|
||||
}
|
||||
|
||||
cn, _ := w.(http.CloseNotifier)
|
||||
|
@ -2,6 +2,7 @@ package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
@ -22,6 +23,7 @@ func TestV2DeleteKey(t *testing.T) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
tests.ReadBody(resp)
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "")
|
||||
@ -31,7 +33,7 @@ func TestV2DeleteKey(t *testing.T) {
|
||||
// Ensures that an empty directory is deleted when dir is set.
|
||||
//
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo ->fail
|
||||
// $ curl -X DELETE localhost:4001/v2/keys/foo ->fail
|
||||
// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true
|
||||
//
|
||||
func TestV2DeleteEmptyDirectory(t *testing.T) {
|
||||
@ -39,9 +41,11 @@ func TestV2DeleteEmptyDirectory(t *testing.T) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
|
||||
tests.ReadBody(resp)
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
|
||||
bodyJson := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, bodyJson["errorCode"], 102, "")
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
|
||||
@ -59,9 +63,11 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?dir=true"), url.Values{})
|
||||
tests.ReadBody(resp)
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
|
||||
bodyJson := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, bodyJson["errorCode"], 108, "")
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true&recursive=true"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
|
||||
@ -78,6 +84,7 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
|
||||
tests.ReadBody(resp)
|
||||
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
|
||||
|
@ -2,6 +2,7 @@ package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
@ -13,6 +14,7 @@ import (
|
||||
|
||||
// Ensures that a value can be retrieve for a given key.
|
||||
//
|
||||
// $ curl localhost:4001/v2/keys/foo/bar -> fail
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
|
||||
// $ curl localhost:4001/v2/keys/foo/bar
|
||||
//
|
||||
@ -20,9 +22,15 @@ func TestV2GetKey(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.Get(fullURL)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
tests.ReadBody(resp)
|
||||
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"))
|
||||
|
||||
resp, _ = tests.Get(fullURL)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "get", "")
|
||||
node := body["node"].(map[string]interface{})
|
||||
@ -51,6 +59,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
|
||||
tests.ReadBody(resp)
|
||||
|
||||
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "get", "")
|
||||
node := body["node"].(map[string]interface{})
|
||||
|
@ -3,6 +3,7 @@ package v2
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"net/http"
|
||||
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/tests"
|
||||
@ -18,7 +19,9 @@ import (
|
||||
func TestV2CreateUnique(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
// POST should add index to list.
|
||||
resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PostForm(fullURL, nil)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "create", "")
|
||||
|
||||
@ -28,7 +31,8 @@ func TestV2CreateUnique(t *testing.T) {
|
||||
assert.Equal(t, node["modifiedIndex"], 2, "")
|
||||
|
||||
// Second POST should add next index to list.
|
||||
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
|
||||
resp, _ = tests.PostForm(fullURL, nil)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body = tests.ReadBodyJSON(resp)
|
||||
|
||||
node = body["node"].(map[string]interface{})
|
||||
@ -36,6 +40,7 @@ func TestV2CreateUnique(t *testing.T) {
|
||||
|
||||
// POST to a different key should add index to that list.
|
||||
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body = tests.ReadBodyJSON(resp)
|
||||
|
||||
node = body["node"].(map[string]interface{})
|
||||
|
@ -2,6 +2,7 @@ package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
@ -20,6 +21,7 @@ func TestV2SetKey(t *testing.T) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
|
||||
@ -33,6 +35,7 @@ func TestV2SetKey(t *testing.T) {
|
||||
func TestV2SetDirectory(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
|
||||
@ -50,6 +53,7 @@ func TestV2SetKeyWithTTL(t *testing.T) {
|
||||
v.Set("value", "XXX")
|
||||
v.Set("ttl", "20")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
node := body["node"].(map[string]interface{})
|
||||
assert.Equal(t, node["ttl"], 20, "")
|
||||
@ -70,6 +74,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
|
||||
v.Set("value", "XXX")
|
||||
v.Set("ttl", "bad_ttl")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 202, "")
|
||||
assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "")
|
||||
@ -77,7 +82,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// Ensures that a key is conditionally set only if it previously did not exist.
|
||||
// Ensures that a key is conditionally set if it previously did not exist.
|
||||
//
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
|
||||
//
|
||||
@ -87,25 +92,29 @@ func TestV2CreateKeySuccess(t *testing.T) {
|
||||
v.Set("value", "XXX")
|
||||
v.Set("prevExist", "false")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
node := body["node"].(map[string]interface{})
|
||||
assert.Equal(t, node["value"], "XXX", "")
|
||||
})
|
||||
}
|
||||
|
||||
// Ensures that a key is not conditionally because it previously existed.
|
||||
// Ensures that a key is not conditionally set because it previously existed.
|
||||
//
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false -> fail
|
||||
//
|
||||
func TestV2CreateKeyFail(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
v.Set("prevExist", "false")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 105, "")
|
||||
assert.Equal(t, body["message"], "Key already exists", "")
|
||||
@ -123,12 +132,15 @@ func TestV2UpdateKeySuccess(t *testing.T) {
|
||||
v := url.Values{}
|
||||
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevExist", "true")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "update", "")
|
||||
})
|
||||
@ -144,9 +156,11 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
|
||||
v := url.Values{}
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), v)
|
||||
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevExist", "true")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 100, "")
|
||||
assert.Equal(t, body["message"], "Key not found", "")
|
||||
@ -156,19 +170,27 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
|
||||
|
||||
// Ensures that a key is not conditionally set if it previously did not exist.
|
||||
//
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d prevExist=true
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=YYY -d prevExist=true -> fail
|
||||
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true -> fail
|
||||
//
|
||||
func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevExist", "true")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 100, "")
|
||||
assert.Equal(t, body["message"], "Key not found", "")
|
||||
assert.Equal(t, body["cause"], "/foo", "")
|
||||
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||
body = tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 100, "")
|
||||
assert.Equal(t, body["message"], "Key not found", "")
|
||||
assert.Equal(t, body["cause"], "/foo", "")
|
||||
})
|
||||
}
|
||||
|
||||
@ -181,11 +203,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevIndex", "2")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "compareAndSwap", "")
|
||||
node := body["node"].(map[string]interface{})
|
||||
@ -203,11 +228,14 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevIndex", "10")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 101, "")
|
||||
assert.Equal(t, body["message"], "Compare failed", "")
|
||||
@ -226,6 +254,7 @@ func TestV2SetKeyCASWithInvalidIndex(t *testing.T) {
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevIndex", "bad_index")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 203, "")
|
||||
assert.Equal(t, body["message"], "The given index in POST form is not a number", "")
|
||||
@ -242,11 +271,14 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevValue", "XXX")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["action"], "compareAndSwap", "")
|
||||
node := body["node"].(map[string]interface{})
|
||||
@ -264,11 +296,14 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
|
||||
tests.RunServer(func(s *server.Server) {
|
||||
v := url.Values{}
|
||||
v.Set("value", "XXX")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
|
||||
resp, _ := tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusCreated)
|
||||
tests.ReadBody(resp)
|
||||
v.Set("value", "YYY")
|
||||
v.Set("prevValue", "AAA")
|
||||
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
resp, _ = tests.PutForm(fullURL, v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 101, "")
|
||||
assert.Equal(t, body["message"], "Compare failed", "")
|
||||
@ -287,6 +322,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
|
||||
v.Set("value", "XXX")
|
||||
v.Set("prevValue", "")
|
||||
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
|
||||
body := tests.ReadBodyJSON(resp)
|
||||
assert.Equal(t, body["errorCode"], 201, "")
|
||||
assert.Equal(t, body["message"], "PrevValue is Required in POST form", "")
|
||||
|
@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
|
||||
return e
|
||||
}
|
||||
|
||||
// scan function is enumerating events from the index in history and
|
||||
// stops till the first point where the key has identified key
|
||||
// scan enumerates events from the index history and stops at the first point
|
||||
// where the key matches.
|
||||
func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
|
||||
eh.rwl.RLock()
|
||||
defer eh.rwl.RUnlock()
|
||||
|
||||
// the index should locate after the event history's StartIndex
|
||||
if index-eh.StartIndex < 0 {
|
||||
// index should be after the event history's StartIndex
|
||||
if index < eh.StartIndex {
|
||||
return nil,
|
||||
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
|
||||
fmt.Sprintf("the requested history has been cleared [%v/%v]",
|
||||
eh.StartIndex, index), 0)
|
||||
}
|
||||
|
||||
// the index should locate before the size of the queue minus the duplicate count
|
||||
// the index should come before the size of the queue minus the duplicate count
|
||||
if index > eh.LastIndex { // future index
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
i := eh.Queue.Front
|
||||
offset := index - eh.StartIndex
|
||||
i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
|
||||
|
||||
for {
|
||||
e := eh.Queue.Events[i]
|
||||
@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
|
||||
ok = ok || strings.HasPrefix(e.Node.Key, key)
|
||||
}
|
||||
|
||||
if ok && index <= e.Index() { // make sure we bypass the smaller one
|
||||
if ok {
|
||||
return e, nil
|
||||
}
|
||||
|
||||
i = (i + 1) % eh.Queue.Capacity
|
||||
|
||||
if i > eh.Queue.back() {
|
||||
if i == eh.Queue.Back {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory {
|
||||
Events: make([]*Event, eh.Queue.Capacity),
|
||||
Size: eh.Queue.Size,
|
||||
Front: eh.Queue.Front,
|
||||
Back: eh.Queue.Back,
|
||||
}
|
||||
|
||||
for i, e := range eh.Queue.Events {
|
||||
|
@ -4,22 +4,17 @@ type eventQueue struct {
|
||||
Events []*Event
|
||||
Size int
|
||||
Front int
|
||||
Back int
|
||||
Capacity int
|
||||
}
|
||||
|
||||
func (eq *eventQueue) back() int {
|
||||
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
|
||||
}
|
||||
|
||||
func (eq *eventQueue) insert(e *Event) {
|
||||
index := (eq.back() + 1) % eq.Capacity
|
||||
|
||||
eq.Events[index] = e
|
||||
eq.Events[eq.Back] = e
|
||||
eq.Back = (eq.Back + 1) % eq.Capacity
|
||||
|
||||
if eq.Size == eq.Capacity { //dequeue
|
||||
eq.Front = (index + 1) % eq.Capacity
|
||||
eq.Front = (eq.Front + 1) % eq.Capacity
|
||||
} else {
|
||||
eq.Size++
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) {
|
||||
t.Fatalf("bad index shoud reuturn nil")
|
||||
}
|
||||
}
|
||||
|
||||
// TestFullEventQueue tests a queue with capacity = 10
|
||||
// Add 1000 events into that queue, and test if scanning
|
||||
// works still for previous events.
|
||||
func TestFullEventQueue(t *testing.T) {
|
||||
|
||||
eh := newEventHistory(10)
|
||||
|
||||
// Add
|
||||
for i := 0; i < 1000; i++ {
|
||||
e := newEvent(Create, "/foo", uint64(i), uint64(i))
|
||||
eh.addEvent(e)
|
||||
e, err := eh.scan("/foo", true, uint64(i-1))
|
||||
if i > 0 {
|
||||
if e == nil || err != nil {
|
||||
t.Fatalf("scan error [/foo] [%v] %v", i-1, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package test
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@ -91,7 +92,7 @@ func TestV1ClusterMigration(t *testing.T) {
|
||||
resp, err := tests.Get("http://localhost:4001/v2/keys/message")
|
||||
body := tests.ReadBody(resp)
|
||||
assert.Nil(t, err, "")
|
||||
assert.Equal(t, resp.StatusCode, 400)
|
||||
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
|
||||
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
|
||||
|
||||
// Ensure TTL'd message is removed.
|
||||
|
Reference in New Issue
Block a user