Compare commits

...

17 Commits

Author SHA1 Message Date
f026d1c14e Merge pull request #414 from philips/getting-started
fix(README): we are building containers too
2013-12-22 20:43:19 -08:00
e30bf19684 fix(README): we are building containers too
We are building a docker container now too so don't get specific about
just the binary.

I thought about adding instructions to the README but lets just keep
following the pattern of putting version getting started guides on the
release page.
2013-12-22 20:13:02 -08:00
d4553714d9 Merge pull request #413 from philips/event_history
fix(event_history): fix a bug in event queue
2013-12-22 15:48:31 -08:00
e1d909eb0e test(store/event_test): add a test for a full queue 2013-12-22 15:42:51 -08:00
317b34f4a0 refactor(store/event_history): cleanup some comments 2013-12-22 15:42:10 -08:00
0937b4d266 refactor(event_history.go) remove the extra logic 2013-12-22 15:42:10 -08:00
ef988020b7 fix(event_history) fix a bug in event queue 2013-12-22 15:42:10 -08:00
70c8c09360 Merge pull request #412 from mojotech/cas/403-proper-http-statuses
Use more appropriate HTTP status codes for error cases.
2013-12-21 20:29:40 -08:00
d89fa131ab feat(v2/errors): Use more appropriate HTTP status codes for error cases.
This commits adds test coverage for all the error and non-error cases
described below, but only the behavior of the 403, 404 and 412 cases
are changing in this commit.

When setting a key results in a new resource, we asset an HTTP status
code of 201 (aka "Created").

When attempting to get a resource that doesn't exist, we assert an
HTTP status code of 404 (aka "Not Found").

When attempting to delete a directory without dir=true, or a non-empty
directory without recursive=true, but the request is otherwise valid,
we assert an HTTP status code of 403 (aka "Forbidden").

When a precondition (e.g. specified by prevIndex, or prevValue) is not
met, but the request is otherwise syntactically valid, we assert an
HTTP status code of 412 (aka "Precondition Failed").  However,
prevExist is handled slightly differently.  If prevExist=false fails,
then this is treated like a failed precondition, so it should use
PreconditionFailed.  But, if prevExist=true fails, then this is
treated like other requests that require the existence of the
resource, and uses NotFound if the resource doesn't exist.

We continue to assert an HTTP status code of 400 when the request is
syntactically invalid (e.g. when prevIndex=bad_index).
2013-12-21 21:39:19 -05:00
3f85829e87 fix(v2/tests): make comments and tests agree about what's being tested
In cases where the comments were incorrect, this changes them to agree
with the tests.  In cases where the comments were correct, this extends
the tests to cover the behavior described in the comment.
2013-12-21 21:39:19 -05:00
3cde996d21 refactor(v2/tests): don't repeat construction of full test URL 2013-12-21 21:39:19 -05:00
39fb266776 fix(error.go): fix typo in comment 2013-12-21 21:39:19 -05:00
557ffbb861 Merge pull request #411 from xiangli-cmu/bench
Bench
2013-12-21 14:16:08 -08:00
ddcf3975ed fix bench 2013-12-21 16:44:28 +08:00
cc88215b46 fix(bench): initial commit 2013-12-20 15:19:02 -08:00
75c02ed0da Merge pull request #405 from benbjohnson/tuning
Add Tuning section to README.
2013-12-18 15:42:16 -08:00
c7536ff5e1 Add Tuning section to README. 2013-12-18 16:40:29 -07:00
16 changed files with 247 additions and 60 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
src/
pkg/
/etcd
/etcdbench
/server/release_version.go
/go-bindata
/machine*

View File

@ -32,7 +32,7 @@ Or feel free to just use curl, as in the examples below.
### Getting etcd
The latest release is available as a binary at [Github][github-release].
The latest release and setup instructions are available at [Github][github-release].
[github-release]: https://github.com/coreos/etcd/releases/
@ -1169,7 +1169,50 @@ openssl ca -config openssl.cnf -policy policy_anything -extensions ssl_client -o
### Tuning
TODO
The default settings in etcd should work well for installations on a local network where the average network latency is low.
However, when using etcd across multiple data centers or over networks with high latency you may need to tweak the heartbeat and election timeout settings.
The underlying distributed consensus protocol relies on two separate timeouts to ensure that nodes can handoff leadership if one stalls or goes offline.
The first timeout is called the *Heartbeat Timeout*.
This is the frequency with which the leader will notify followers that it is still the leader.
etcd batches commands together for higher throughput so this heartbeat timeout is also a delay for how long it takes for commands to be committed.
By default, etcd uses a `50ms` heartbeat timeout.
The second timeout is the *Election Timeout*.
This timeout is how long a follower node will go without hearing a heartbeat before attempting to become leader itself.
By default, etcd uses a `200ms` election timeout.
Adjusting these values is a trade off.
Lowering the heartbeat timeout will cause individual commands to be committed faster but it will lower the overall throughput of etcd.
If your etcd instances have low utilization then lowering the heartbeat timeout can improve your command response time.
The election timeout should be set based on the heartbeat timeout and your network ping time between nodes.
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
You should also set your election timeout to at least 4 to 5 times your heartbeat timeout to account for variance in leader replication.
For a heartbeat timeout of 50ms you should set your election timeout to at least 200ms - 250ms.
You can override the default values on the command line:
```sh
# Command line arguments:
$ etcd -peer-heartbeat-timeout=100 -peer-election-timeout=500
# Environment variables:
$ ETCD_PEER_HEARTBEAT_TIMEOUT=100 ETCD_PEER_ELECTION_TIMEOUT=500 etcd
```
Or you can set the values within the configuration file:
```toml
[peer]
heartbeat_timeout = 100
election_timeout = 100
```
The values are specified in milliseconds.
## Project Details

58
bench/bench.go Normal file
View File

@ -0,0 +1,58 @@
package main
import (
"flag"
"log"
"strconv"
"github.com/coreos/go-etcd/etcd"
)
func write(requests int, end chan int) {
client := etcd.NewClient(nil)
for i := 0; i < requests; i++ {
key := strconv.Itoa(i)
client.Set(key, key, 0)
}
end <- 1
}
func watch(key string) {
client := etcd.NewClient(nil)
receiver := make(chan *etcd.Response)
go client.Watch(key, 0, true, receiver, nil)
log.Printf("watching: %s", key)
received := 0
for {
<-receiver
received++
}
}
func main() {
rWrites := flag.Int("write-requests", 50000, "number of writes")
cWrites := flag.Int("concurrent-writes", 500, "number of concurrent writes")
watches := flag.Int("watches", 500, "number of writes")
flag.Parse()
for i := 0; i < *watches; i++ {
key := strconv.Itoa(i)
go watch(key)
}
wChan := make(chan int, *cWrites)
for i := 0; i < *cWrites; i++ {
go write((*rWrites / *cWrites), wChan)
}
for i := 0; i < *cWrites; i++ {
<-wChan
log.Printf("Completed %d writes", (*rWrites / *cWrites))
}
}

1
build
View File

@ -24,3 +24,4 @@ done
./scripts/release-version > server/release_version.go
go build "${ETCD_PACKAGE}"
go build -o etcdbench "${ETCD_PACKAGE}"/bench

View File

@ -111,10 +111,19 @@ func (e Error) toJsonString() string {
func (e Error) Write(w http.ResponseWriter) {
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
// 3xx is reft internal error
// 3xx is raft internal error
status := http.StatusBadRequest
switch e.ErrorCode {
case EcodeKeyNotFound:
status = http.StatusNotFound
case EcodeNotFile, EcodeDirNotEmpty:
status = http.StatusForbidden
case EcodeTestFailed, EcodeNodeExist:
status = http.StatusPreconditionFailed
default:
if e.ErrorCode/100 == 3 {
http.Error(w, e.toJsonString(), http.StatusInternalServerError)
} else {
http.Error(w, e.toJsonString(), http.StatusBadRequest)
status = http.StatusInternalServerError
}
}
http.Error(w, e.toJsonString(), status)
}

View File

@ -86,11 +86,11 @@ func main() {
ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
ps.MaxClusterSize = config.MaxClusterSize
ps.RetryTimes = config.MaxRetryAttempts
if config.HeartbeatTimeout > 0 {
ps.HeartbeatTimeout = time.Duration(config.HeartbeatTimeout) * time.Millisecond
if config.Peer.HeartbeatTimeout > 0 {
ps.HeartbeatTimeout = time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
}
if config.ElectionTimeout > 0 {
ps.ElectionTimeout = time.Duration(config.ElectionTimeout) * time.Millisecond
if config.Peer.ElectionTimeout > 0 {
ps.ElectionTimeout = time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
}
// Create client server.

View File

@ -67,14 +67,14 @@ type Config struct {
ShowVersion bool
Verbose bool `toml:"verbose" env:"ETCD_VERBOSE"`
VeryVerbose bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
HeartbeatTimeout int `toml:"peer_heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
ElectionTimeout int `toml:"peer_election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
Peer struct {
Addr string `toml:"addr" env:"ETCD_PEER_ADDR"`
BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
CAFile string `toml:"ca_file" env:"ETCD_PEER_CA_FILE"`
CertFile string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"`
KeyFile string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"`
HeartbeatTimeout int `toml:"heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
ElectionTimeout int `toml:"election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
}
}
@ -86,10 +86,10 @@ func NewConfig() *Config {
c.MaxClusterSize = 9
c.MaxResultBuffer = 1024
c.MaxRetryAttempts = 3
c.Peer.Addr = "127.0.0.1:7001"
c.SnapshotCount = 10000
c.ElectionTimeout = 0
c.HeartbeatTimeout = 0
c.Peer.Addr = "127.0.0.1:7001"
c.Peer.HeartbeatTimeout = 0
c.Peer.ElectionTimeout = 0
return c
}
@ -236,8 +236,8 @@ func (c *Config) LoadFlags(arguments []string) error {
f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", c.HeartbeatTimeout, "")
f.IntVar(&c.ElectionTimeout, "peer-election-timeout", c.ElectionTimeout, "")
f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "")
f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "")
f.StringVar(&cors, "cors", "", "")

View File

@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Start the watcher on the store.
eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
if err != nil {
return etcdErr.NewError(500, key, s.Store().Index())
return err
}
cn, _ := w.(http.CloseNotifier)

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
@ -22,6 +23,7 @@ func TestV2DeleteKey(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "")
@ -31,7 +33,7 @@ func TestV2DeleteKey(t *testing.T) {
// Ensures that an empty directory is deleted when dir is set.
//
// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
// $ curl -X PUT localhost:4001/v2/keys/foo ->fail
// $ curl -X DELETE localhost:4001/v2/keys/foo ->fail
// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true
//
func TestV2DeleteEmptyDirectory(t *testing.T) {
@ -39,9 +41,11 @@ func TestV2DeleteEmptyDirectory(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
bodyJson := tests.ReadBodyJSON(resp)
assert.Equal(t, bodyJson["errorCode"], 102, "")
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
@ -59,9 +63,11 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusForbidden)
bodyJson := tests.ReadBodyJSON(resp)
assert.Equal(t, bodyJson["errorCode"], 108, "")
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true&recursive=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
@ -78,6 +84,7 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
"time"
@ -13,6 +14,7 @@ import (
// Ensures that a value can be retrieve for a given key.
//
// $ curl localhost:4001/v2/keys/foo/bar -> fail
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl localhost:4001/v2/keys/foo/bar
//
@ -20,9 +22,15 @@ func TestV2GetKey(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.Get(fullURL)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
resp, _ = tests.PutForm(fullURL, v)
tests.ReadBody(resp)
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"))
resp, _ = tests.Get(fullURL)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
node := body["node"].(map[string]interface{})
@ -51,6 +59,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
tests.ReadBody(resp)
resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
node := body["node"].(map[string]interface{})

View File

@ -3,6 +3,7 @@ package v2
import (
"fmt"
"testing"
"net/http"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
@ -18,7 +19,9 @@ import (
func TestV2CreateUnique(t *testing.T) {
tests.RunServer(func(s *server.Server) {
// POST should add index to list.
resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PostForm(fullURL, nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "create", "")
@ -28,7 +31,8 @@ func TestV2CreateUnique(t *testing.T) {
assert.Equal(t, node["modifiedIndex"], 2, "")
// Second POST should add next index to list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
resp, _ = tests.PostForm(fullURL, nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
@ -36,6 +40,7 @@ func TestV2CreateUnique(t *testing.T) {
// POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})

View File

@ -2,6 +2,7 @@ package v2
import (
"fmt"
"net/http"
"net/url"
"testing"
"time"
@ -20,6 +21,7 @@ func TestV2SetKey(t *testing.T) {
v := url.Values{}
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
@ -33,6 +35,7 @@ func TestV2SetKey(t *testing.T) {
func TestV2SetDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
@ -50,6 +53,7 @@ func TestV2SetKeyWithTTL(t *testing.T) {
v.Set("value", "XXX")
v.Set("ttl", "20")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
node := body["node"].(map[string]interface{})
assert.Equal(t, node["ttl"], 20, "")
@ -70,6 +74,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
v.Set("value", "XXX")
v.Set("ttl", "bad_ttl")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 202, "")
assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "")
@ -77,7 +82,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
})
}
// Ensures that a key is conditionally set only if it previously did not exist.
// Ensures that a key is conditionally set if it previously did not exist.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
//
@ -87,25 +92,29 @@ func TestV2CreateKeySuccess(t *testing.T) {
v.Set("value", "XXX")
v.Set("prevExist", "false")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBodyJSON(resp)
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "XXX", "")
})
}
// Ensures that a key is not conditionally because it previously existed.
// Ensures that a key is not conditionally set because it previously existed.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false -> fail
//
func TestV2CreateKeyFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
v.Set("prevExist", "false")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 105, "")
assert.Equal(t, body["message"], "Key already exists", "")
@ -123,12 +132,15 @@ func TestV2UpdateKeySuccess(t *testing.T) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "update", "")
})
@ -144,9 +156,11 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
v := url.Values{}
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
@ -156,19 +170,27 @@ func TestV2UpdateKeyFailOnValue(t *testing.T) {
// Ensures that a key is not conditionally set if it previously did not exist.
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d prevExist=true
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=YYY -d prevExist=true -> fail
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true -> fail
//
func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "YYY")
v.Set("prevExist", "true")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
assert.Equal(t, body["cause"], "/foo", "")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 100, "")
assert.Equal(t, body["message"], "Key not found", "")
assert.Equal(t, body["cause"], "/foo", "")
})
}
@ -181,11 +203,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "2")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
@ -203,11 +228,14 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "10")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
@ -226,6 +254,7 @@ func TestV2SetKeyCASWithInvalidIndex(t *testing.T) {
v.Set("value", "YYY")
v.Set("prevIndex", "bad_index")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 203, "")
assert.Equal(t, body["message"], "The given index in POST form is not a number", "")
@ -242,11 +271,14 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "XXX")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
@ -264,11 +296,14 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
v := url.Values{}
v.Set("value", "XXX")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
fullURL := fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")
resp, _ := tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
@ -287,6 +322,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
v.Set("value", "XXX")
v.Set("prevValue", "")
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusBadRequest)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 201, "")
assert.Equal(t, body["message"], "PrevValue is Required in POST form", "")

View File

@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
return e
}
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified key
// scan enumerates events from the index history and stops at the first point
// where the key matches.
func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()
// the index should locate after the event history's StartIndex
if index-eh.StartIndex < 0 {
// index should be after the event history's StartIndex
if index < eh.StartIndex {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), 0)
}
// the index should locate before the size of the queue minus the duplicate count
// the index should come before the size of the queue minus the duplicate count
if index > eh.LastIndex { // future index
return nil, nil
}
i := eh.Queue.Front
offset := index - eh.StartIndex
i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
for {
e := eh.Queue.Events[i]
@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
ok = ok || strings.HasPrefix(e.Node.Key, key)
}
if ok && index <= e.Index() { // make sure we bypass the smaller one
if ok {
return e, nil
}
i = (i + 1) % eh.Queue.Capacity
if i > eh.Queue.back() {
if i == eh.Queue.Back {
return nil, nil
}
}
@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory {
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
Back: eh.Queue.Back,
}
for i, e := range eh.Queue.Events {

View File

@ -4,22 +4,17 @@ type eventQueue struct {
Events []*Event
Size int
Front int
Back int
Capacity int
}
func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}
func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e
eq.Events[eq.Back] = e
eq.Back = (eq.Back + 1) % eq.Capacity
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
eq.Front = (eq.Front + 1) % eq.Capacity
} else {
eq.Size++
}
}

View File

@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) {
t.Fatalf("bad index shoud reuturn nil")
}
}
// TestFullEventQueue tests a queue with capacity = 10
// Add 1000 events into that queue, and test if scanning
// works still for previous events.
func TestFullEventQueue(t *testing.T) {
eh := newEventHistory(10)
// Add
for i := 0; i < 1000; i++ {
e := newEvent(Create, "/foo", uint64(i), uint64(i))
eh.addEvent(e)
e, err := eh.scan("/foo", true, uint64(i-1))
if i > 0 {
if e == nil || err != nil {
t.Fatalf("scan error [/foo] [%v] %v", i-1, i)
}
}
}
}

View File

@ -3,6 +3,7 @@ package test
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
@ -91,7 +92,7 @@ func TestV1ClusterMigration(t *testing.T) {
resp, err := tests.Get("http://localhost:4001/v2/keys/message")
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, resp.StatusCode, 400)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
// Ensure TTL'd message is removed.