Compare commits

...

51 Commits

Author SHA1 Message Date
7e4fc7eaa9 version: bump up to 2.3.8 2017-02-15 19:25:01 -08:00
61eee5c884 test: skip govet tests in CI 2017-02-15 19:24:37 -08:00
ae24914aec etcdhttp: fix govet 2017-02-15 19:17:38 -08:00
653789bcbc etcdserver: fix govet 2017-02-15 19:11:23 -08:00
62c1e9a824 travis: use Go 1.7.5 2017-02-15 17:02:50 -08:00
67228bf5d8 tools: remove 'etcd-top' for CI tests 2017-02-15 10:24:58 -08:00
316adb4bcc test: do not run 'goword' tests in CI 2017-02-14 23:22:44 -08:00
756992d30f travis: disable email notifications
Was spamming security@coreos.com
2017-02-14 23:19:35 -08:00
aaf0ac9ff4 Merge pull request #7282 from heyitsanthony/fix-write-snap-2.3
snap: fix write snap
2017-02-08 20:16:25 -08:00
d1ba8ee6d3 snap: fix write snap
Do not use writeFile since it does not sync file before closing.
This can lead to slient file corruption when disk is full.
2017-02-06 09:50:14 -08:00
97dd45122b hack: install goreman in tls-setup example 2016-06-30 09:30:50 -07:00
6ac6f5d67b hack: add tls-setup example generated certs to gitignore 2016-06-30 09:30:40 -07:00
e401e17d2c Merge pull request #5756 from joshix/proxyport
Documentation/proxy: Change proxy port from 8080 to 2379
2016-06-23 11:37:21 -07:00
809949b687 Documentation/proxy: Change proxy port from 8080 to 2379
Ref https://github.com/coreos/etcd/pull/5693 in v3 track (master).
Ref backport to 2.3.7 docs in https://github.com/coreos-inc/coreos-pages/pull/747
2016-06-22 23:50:52 -07:00
9108045e65 version: bump to v2.3.7+git 2016-06-17 11:35:22 -07:00
fd17c9101d version: bump to v2.3.7 2016-06-17 11:08:26 -07:00
9ce0e8bf81 store: copy old value when refresh + cas 2016-06-17 11:07:40 -07:00
5b3ffa86fa version: bump to v2.3.6+git 2016-05-27 13:45:16 -07:00
128344c455 version: bump to v2.3.6 2016-05-27 13:27:14 -07:00
3b100ad142 etcd: fix refresh feature
When using refresh, etcd store v2 watch is broken. Although with refresh
store should not trigger current watchers, it should still add events into
the watchhub to make a complete history. Current store fails to add the event
into the watchhub, which causes issues.
2016-05-27 13:25:22 -07:00
20793a29e3 version: bump to v2.3.5+git 2016-05-20 11:11:05 -07:00
a535dc994b version: bump to v2.3.5 2016-05-20 10:36:58 -07:00
46d347812b etcdserver: wait for snapshots before closing raft
Fixes #5374
2016-05-20 10:35:50 -07:00
1d12212e60 etcdserver: stop raft after stopping apply scheduler
Was causing a pipeline leak.
2016-05-20 10:35:28 -07:00
1f17d7204e etcdsever: fix the leaky snashot routine issue 2016-05-20 10:34:48 -07:00
198664e49c Documentation/v2: fix typo for updating a member
Fix https://github.com/coreos/etcd/issues/5358.
2016-05-20 10:33:49 -07:00
ee872bb7ca Documentation/v2: fix auth_api.md bug
role guest read and write is "/*", not "*", same with other roles.
2016-05-20 10:33:47 -07:00
8c9a3c55bd raft: do not panic when removing all the nodes from cluster 2016-05-20 10:33:45 -07:00
6f1ceee9a3 v2http: allow empty role for GET /users
Fix https://github.com/coreos/etcd/issues/5246.
2016-05-20 10:33:43 -07:00
f47375af89 version: bump to v2.3.4+git 2016-05-13 11:58:24 -07:00
df60227765 version: bump to v2.3.4 2016-05-12 14:33:37 -07:00
4db35c113d README: add known bugs
cherry-picked from https://github.com/coreos/etcd/pull/5320.
2016-05-12 14:22:45 -07:00
cf68c2285e etcdctl: Add --wal-dir and --backup-wal-dir options to backup command.
If the WAL is stored in a separate directory then the backup command
would need a --wal-dir option to pick the path to the WAL directory.
The user might also want to store the backup of data and wal separately
for which --backup-wal-dir option is provided.
2016-05-12 14:17:31 -07:00
743f9c9bb0 etcdctl/ctlv2: total-timeout for Sync
Fix https://github.com/coreos/etcd/issues/4897.
2016-05-12 14:15:13 -07:00
f9e09e1b1a httpproxy: fix race on getting close notifier channel
Fixes #5267
2016-05-12 14:14:16 -07:00
d6eb1e7a5f *: bump to v2.3.3+git 2016-04-29 14:18:50 -07:00
c41345d393 *: bump to v2.3.3 2016-04-29 12:38:04 -07:00
506ef9fe8d etcdserver/auth: check empty password in merge
Fix https://github.com/coreos/etcd/issues/5182.
2016-04-29 12:37:24 -07:00
49141d5916 *: bump to v2.3.2+git 2016-04-21 12:57:57 -07:00
ce63f10738 *: bump to v2.3.2 2016-04-21 11:26:37 -07:00
31bd750141 etcdserver: close response body when getting cluster information 2016-04-21 10:58:58 -07:00
37510d0306 e2e: test etcdtl user list on root user 2016-04-21 10:58:46 -07:00
d7da3787bc client: accept roles in response for ListUser
Fixes #5046
2016-04-21 10:58:40 -07:00
54d0f1d43b e2e: test etcdctl v2 double user grant
Crashes in 2.3.1
2016-04-21 10:58:30 -07:00
20db10f6f7 etcdctl: print grant/revoke error instead of scanning roles for changes
Fixes #5045
2016-04-21 10:58:17 -07:00
11c09373e1 etcdmain: start on unsupported arch when ETCD_UNSUPPORTED_ARCH is set 2016-04-21 10:42:45 -07:00
96f412e4d7 *: bump to v2.3.1+git 2016-04-01 14:56:48 -07:00
2b67f5256a *: bump to v2.3.1 2016-04-01 14:32:28 -07:00
6aa8b631e6 client: return original ctx error
Fix https://github.com/coreos/etcd/issues/3209.
2016-04-01 14:30:31 -07:00
72dea51e6a rafthttp: do not block on proposal 2016-04-01 14:28:50 -07:00
74fa0270a4 *: bump to v2.3.0+git 2016-03-18 10:23:04 -07:00
38 changed files with 405 additions and 440 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@
/hack/insta-discovery/.env
*.test
tools/functional-tester/docker/bin
hack/tls-setup/certs

View File

@ -2,24 +2,11 @@ language: go
sudo: false
go:
- 1.4
- 1.5
- 1.6
- tip
- 1.7.5
matrix:
allow_failures:
- go: tip
addons:
apt:
packages:
- libpcap-dev
- libaspell-dev
- libhunspell-dev
before_install:
- go get -v github.com/chzchzchz/goword
notifications:
on_success: never
on_failure: never
script:
- ./test

View File

@ -135,7 +135,7 @@ The data directory contains all the data to recover a member to its point-in-tim
* Stop the member process.
* Copy the data directory of the now-idle member to the new machine.
* Update the peer URLs for the replaced member to reflect the new machine according to the [runtime reconfiguration instructions][update-member].
* Update the peer URLs for the replaced member to reflect the new machine according to the [runtime reconfiguration instructions][update-a-member].
* Start etcd on the new machine, using the same configuration and the copy of the data directory.
This example will walk you through the process of migrating the infra1 member to a new machine:
@ -217,12 +217,14 @@ To recover from such scenarios, etcd provides functionality to backup and restor
**NB:** Windows users must stop etcd before running the backup command.
The first step of the recovery is to backup the data directory on a functioning etcd node. To do this, use the `etcdctl backup` command, passing in the original data directory used by etcd. For example:
The first step of the recovery is to backup the data directory and wal directory, if stored separately, on a functioning etcd node. To do this, use the `etcdctl backup` command, passing in the original data (and wal) directory used by etcd. For example:
```sh
etcdctl backup \
--data-dir %data_dir% \
[--wal-dir %wal_dir%] \
--backup-dir %backup_data_dir%
[--backup-wal-dir %backup_wal_dir%]
```
This command will rewrite some of the metadata contained in the backup (specifically, the node ID and cluster ID), which means that the node will lose its former identity. In order to recreate a cluster from the backup, you will need to start a new, single-node cluster. The metadata is rewritten to prevent the new node from inadvertently being joined onto an existing cluster.
@ -234,26 +236,30 @@ To restore a backup using the procedure created above, start etcd with the `-for
```sh
etcd \
-data-dir=%backup_data_dir% \
[-wal-dir=%backup_wal_dir%] \
-force-new-cluster \
...
```
Now etcd should be available on this node and serving the original datastore.
Once you have verified that etcd has started successfully, shut it down and move the data back to the previous location (you may wish to make another copy as well to be safe):
Once you have verified that etcd has started successfully, shut it down and move the data and wal, if stored separately, back to the previous location (you may wish to make another copy as well to be safe):
```sh
pkill etcd
rm -fr %data_dir%
rm -fr %wal_dir%
mv %backup_data_dir% %data_dir%
mv %backup_wal_dir% %wal_dir%
etcd \
-data-dir=%data_dir% \
[-wal-dir=%wal_dir%] \
...
```
#### Restoring the cluster
Now that the node is running successfully, [change its advertised peer URLs][update-member], as the `--force-new-cluster` option has set the peer URL to the default listening on localhost.
Now that the node is running successfully, [change its advertised peer URLs][update-a-member], as the `--force-new-cluster` option has set the peer URL to the default listening on localhost.
You can then add more nodes to the cluster and restore resiliency. See the [add a new member][add-a-member] guide for more details. **NB:** If you are trying to restore your cluster using old failed etcd nodes, please make sure you have stopped old etcd instances and removed their old data directories specified by the data-dir configuration parameter.

View File

@ -233,10 +233,11 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t
### Refreshing key TTL
Keys in etcd can be refreshed without notifying watchers
this can be achieved by setting the refresh to true when updating a TTL
Keys in etcd can be refreshed without notifying current watchers.
You cannot update the value of a key when refreshing it
This can be achieved by setting the refresh to true when updating a TTL.
You cannot update the value of a key when refreshing it.
```sh
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5

View File

@ -145,8 +145,8 @@ GET/HEAD /v2/auth/users
"role": "root",
"permissions": {
"kv": {
"read": ["*"],
"write": ["*"]
"read": ["/*"],
"write": ["/*"]
}
}
}
@ -159,8 +159,8 @@ GET/HEAD /v2/auth/users
"role": "guest",
"permissions": {
"kv": {
"read": ["*"],
"write": ["*"]
"read": ["/*"],
"write": ["/*"]
}
}
}
@ -198,8 +198,8 @@ GET/HEAD /v2/auth/users/alice
"role": "etcd",
"permissions" : {
"kv" : {
"read": [ "*" ],
"write": [ "*" ]
"read": [ "/*" ],
"write": [ "/*" ]
}
}
}
@ -311,8 +311,8 @@ GET/HEAD /v2/auth/roles
"role": "etcd",
"permissions": {
"kv": {
"read": ["*"],
"write": ["*"]
"read": ["/*"],
"write": ["/*"]
}
}
},
@ -320,8 +320,8 @@ GET/HEAD /v2/auth/roles
"role": "quay",
"permissions": {
"kv": {
"read": ["*"],
"write": ["*"]
"read": ["/*"],
"write": ["/*"]
}
}
}
@ -393,7 +393,7 @@ PUT /v2/auth/roles/guest
"revoke" : {
"kv" : {
"write": [
"*"
"/*"
]
}
}

View File

@ -49,7 +49,7 @@ To start a proxy that will connect to a statically defined etcd cluster, specify
```
etcd --proxy on \
--listen-client-urls http://127.0.0.1:8080 \
--listen-client-urls http://127.0.0.1:2379 \
--initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380
```
@ -60,7 +60,7 @@ To start a proxy using the discovery service, specify the `discovery` flag. The
```
etcd --proxy on \
--listen-client-urls http://127.0.0.1:8080 \
--listen-client-urls http://127.0.0.1:2379 \
--discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de \
```

View File

@ -118,6 +118,17 @@ See [CONTRIBUTING](CONTRIBUTING.md) for details on submitting patches and the co
See [reporting bugs](Documentation/reporting_bugs.md) for details about reporting any issue you may encounter.
## Known bugs
[GH518](https://github.com/coreos/etcd/issues/518) is a known bug. Issue is that:
```
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d dir=true -d prevExist=true
```
If the previous node is a key and client tries to overwrite it with `dir=true`, it does not give warnings such as `Not a directory`. Instead, the key is set to empty value.
## Project Details
### Versioning
@ -137,12 +148,15 @@ curl -L http://127.0.0.1:2379/version
The `v2` API responses should not change after the 2.0.0 release but new features will be added over time.
#### 32-bit systems
#### 32-bit and other unsupported systems
etcd has known issues on 32-bit systems due to a bug in the Go runtime. See #[358][358] for more information.
To avoid inadvertantly producing an unstable etcd server, 32-bit builds emit an `etcd` that prints
a warning message and immediately exits.
To avoid inadvertantly running a possibly unstable etcd server, `etcd` on unsupported architectures will print
a warning message and immediately exit if the environment variable `ETCD_UNSUPPORTED_ARCH` is not set to
the target architecture.
Currently only the amd64 architecture is officially supported by `etcd`.
[358]: https://github.com/coreos/etcd/issues/358

View File

@ -36,6 +36,12 @@ type User struct {
Revoke []string `json:"revoke,omitempty"`
}
// userListEntry is the user representation given by the server for ListUsers
type userListEntry struct {
User string `json:"user"`
Roles []Role `json:"roles"`
}
type UserRoles struct {
User string `json:"user"`
Roles []Role `json:"roles"`
@ -198,7 +204,7 @@ func (u *httpAuthUserAPI) ListUsers(ctx context.Context) ([]string, error) {
}
var userList struct {
Users []User `json:"users"`
Users []userListEntry `json:"users"`
}
if err = json.Unmarshal(body, &userList); err != nil {

View File

@ -342,7 +342,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
resp, body, err = hc.Do(ctx, action)
if err != nil {
cerr.Errors = append(cerr.Errors, err)
// mask previous errors with context error, which is controlled by user
if err == ctx.Err() {
return nil, nil, ctx.Err()
}
if err == context.Canceled || err == context.DeadlineExceeded {
return nil, nil, err
}

View File

@ -295,7 +295,7 @@ func TestSimpleHTTPClientDoHeaderTimeout(t *testing.T) {
t.Fatalf("expected non-nil error, got nil")
}
case <-time.After(time.Second):
t.Fatalf("unexpected timeout when waitting for the test to finish")
t.Fatalf("unexpected timeout when waiting for the test to finish")
}
}
@ -444,7 +444,51 @@ func TestHTTPClusterClientDoDeadlineExceedContext(t *testing.T) {
t.Errorf("err = %+v, want %+v", err, context.DeadlineExceeded)
}
case <-time.After(time.Second):
t.Fatalf("unexpected timeout when waitting for request to deadline exceed")
t.Fatalf("unexpected timeout when waiting for request to deadline exceed")
}
}
type fakeCancelContext struct{}
var fakeCancelContextError = errors.New("fake context canceled")
func (f fakeCancelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
func (f fakeCancelContext) Done() <-chan struct{} {
d := make(chan struct{}, 1)
d <- struct{}{}
return d
}
func (f fakeCancelContext) Err() error { return fakeCancelContextError }
func (f fakeCancelContext) Value(key interface{}) interface{} { return 1 }
func withTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
return parent, func() { parent = nil }
}
func TestHTTPClusterClientDoCanceledContext(t *testing.T) {
fakeURL := url.URL{}
tr := newFakeTransport()
tr.finishCancel <- struct{}{}
c := &httpClusterClient{
clientFactory: newHTTPClientFactory(tr, DefaultCheckRedirect, 0),
endpoints: []url.URL{fakeURL},
}
errc := make(chan error)
go func() {
ctx, cancel := withTimeout(fakeCancelContext{}, time.Millisecond)
cancel()
_, _, err := c.Do(ctx, &fakeAction{})
errc <- err
}()
select {
case err := <-errc:
if err != fakeCancelContextError {
t.Errorf("err = %+v, want %+v", err, fakeCancelContextError)
}
case <-time.After(time.Second):
t.Fatalf("unexpected timeout when waiting for request to fake context canceled")
}
}

View File

@ -185,9 +185,18 @@ func testCtlV2GetRoleUser(t *testing.T, cfg *etcdProcessClusterConfig) {
if err := etcdctlUserGet(epc, "username"); err != nil {
t.Fatalf("failed to get user (%v)", err)
}
// ensure double grant gives an error; was crashing in 2.3.1
regrantArgs := etcdctlPrefixArgs(epc)
regrantArgs = append(regrantArgs, "user", "grant", "--roles", "foo", "username")
if err := spawnWithExpect(regrantArgs, "duplicate"); err != nil {
t.Fatalf("missing duplicate error on double grant role (%v)", err)
}
}
func TestCtlV2UserList(t *testing.T) {
func TestCtlV2UserListUsername(t *testing.T) { testCtlV2UserList(t, "username") }
func TestCtlV2UserListRoot(t *testing.T) { testCtlV2UserList(t, "root") }
func testCtlV2UserList(t *testing.T, username string) {
defer testutil.AfterTest(t)
epc := setupEtcdctlTest(t, &configWithProxy, false)
@ -197,10 +206,10 @@ func TestCtlV2UserList(t *testing.T) {
}
}()
if err := etcdctlUserAdd(epc, "username", "password"); err != nil {
if err := etcdctlUserAdd(epc, username, "password"); err != nil {
t.Fatalf("failed to add user (%v)", err)
}
if err := etcdctlUserList(epc, "username"); err != nil {
if err := etcdctlUserList(epc, username); err != nil {
t.Fatalf("failed to list users (%v)", err)
}
}

View File

@ -37,7 +37,9 @@ func NewBackupCommand() cli.Command {
ArgsUsage: " ",
Flags: []cli.Flag{
cli.StringFlag{Name: "data-dir", Value: "", Usage: "Path to the etcd data dir"},
cli.StringFlag{Name: "wal-dir", Value: "", Usage: "Path to the etcd wal dir"},
cli.StringFlag{Name: "backup-dir", Value: "", Usage: "Path to the backup dir"},
cli.StringFlag{Name: "backup-wal-dir", Value: "", Usage: "Path to the backup wal dir"},
},
Action: handleBackup,
}
@ -45,10 +47,23 @@ func NewBackupCommand() cli.Command {
// handleBackup handles a request that intends to do a backup.
func handleBackup(c *cli.Context) {
var srcWAL string
var destWAL string
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
if c.String("wal-dir") != "" {
srcWAL = c.String("wal-dir")
} else {
srcWAL = path.Join(c.String("data-dir"), "member", "wal")
}
if c.String("backup-wal-dir") != "" {
destWAL = c.String("backup-wal-dir")
} else {
destWAL = path.Join(c.String("backup-dir"), "member", "wal")
}
if err := os.MkdirAll(destSnap, 0700); err != nil {
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)

View File

@ -17,8 +17,6 @@ package command
import (
"fmt"
"os"
"reflect"
"sort"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/bgentry/speakeasy"
@ -195,21 +193,12 @@ func userGrantRevoke(c *cli.Context, grant bool) {
os.Exit(1)
}
var newUser *client.User
if grant {
newUser, err = api.GrantUser(ctx, user, roles)
_, err = api.GrantUser(ctx, user, roles)
} else {
newUser, err = api.RevokeUser(ctx, user, roles)
}
sort.Strings(newUser.Roles)
sort.Strings(currentUser.Roles)
if reflect.DeepEqual(newUser.Roles, currentUser.Roles) {
if grant {
fmt.Printf("User unchanged; roles already granted")
} else {
fmt.Printf("User unchanged; roles already revoked")
}
_, err = api.RevokeUser(ctx, user, roles)
}
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)

View File

@ -168,7 +168,13 @@ func getTransport(c *cli.Context) (*http.Transport, error) {
CertFile: certfile,
KeyFile: keyfile,
}
return transport.NewTransport(tls, defaultDialTimeout)
dialTimeout := defaultDialTimeout
totalTimeout := c.GlobalDuration("total-timeout")
if totalTimeout != 0 && totalTimeout < dialTimeout {
dialTimeout = totalTimeout
}
return transport.NewTransport(tls, dialTimeout)
}
func getUsernamePasswordFromFlag(usernameFlag string) (username string, password string, err error) {
@ -215,7 +221,7 @@ func mustNewClient(c *cli.Context) client.Client {
if debug {
fmt.Fprintf(os.Stderr, "start to sync cluster using endpoints(%s)\n", strings.Join(hc.Endpoints(), ","))
}
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
ctx, cancel := contextWithTotalTimeout(c)
err := hc.Sync(ctx)
cancel()
if err != nil {

View File

@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO: support arm64
// +build amd64
package etcdmain
import (
@ -79,6 +76,8 @@ var (
)
func Main() {
checkSupportArch()
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
if err != nil {
@ -554,3 +553,16 @@ func setupLogging(cfg *config) {
repoLog.SetLogLevel(settings)
}
}
func checkSupportArch() {
// TODO qualify arm64
if runtime.GOARCH == "amd64" {
return
}
if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
plog.Warningf("running etcd on unsupported architecture %q since ETCD_UNSUPPORTED_ARCH is set", env)
return
}
plog.Errorf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set.", runtime.GOARCH)
os.Exit(1)
}

View File

@ -1,31 +0,0 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !amd64
package etcdmain
import (
"fmt"
"os"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
)
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdmain")
func Main() {
fmt.Println("unsupported architecture; unreliable, unstable")
os.Exit(-1)
}

View File

@ -281,13 +281,7 @@ func (s *store) UpdateUser(user User) (User, error) {
return old, err
}
hash, err := s.HashPassword(user.Password)
if err != nil {
return old, err
}
user.Password = hash
newUser, err := old.merge(user)
newUser, err := old.merge(user, s.PasswordStore)
if err != nil {
return old, err
}
@ -452,29 +446,33 @@ func (s *store) DisableAuth() error {
// is called and returns a new User with these modifications applied. Think of
// all Users as immutable sets of data. Merge allows you to perform the set
// operations (desired grants and revokes) atomically
func (u User) merge(n User) (User, error) {
func (ou User) merge(nu User, s PasswordStore) (User, error) {
var out User
if u.User != n.User {
return out, authErr(http.StatusConflict, "Merging user data with conflicting usernames: %s %s", u.User, n.User)
if ou.User != nu.User {
return out, authErr(http.StatusConflict, "Merging user data with conflicting usernames: %s %s", ou.User, nu.User)
}
out.User = u.User
if n.Password != "" {
out.Password = n.Password
out.User = ou.User
if nu.Password != "" {
hash, err := s.HashPassword(nu.Password)
if err != nil {
return ou, err
}
out.Password = hash
} else {
out.Password = u.Password
out.Password = ou.Password
}
currentRoles := types.NewUnsafeSet(u.Roles...)
for _, g := range n.Grant {
currentRoles := types.NewUnsafeSet(ou.Roles...)
for _, g := range nu.Grant {
if currentRoles.Contains(g) {
plog.Noticef("granting duplicate role %s for user %s", g, n.User)
return User{}, authErr(http.StatusConflict, fmt.Sprintf("Granting duplicate role %s for user %s", g, n.User))
plog.Noticef("granting duplicate role %s for user %s", g, nu.User)
return User{}, authErr(http.StatusConflict, fmt.Sprintf("Granting duplicate role %s for user %s", g, nu.User))
}
currentRoles.Add(g)
}
for _, r := range n.Revoke {
for _, r := range nu.Revoke {
if !currentRoles.Contains(r) {
plog.Noticef("revoking ungranted role %s for user %s", r, n.User)
return User{}, authErr(http.StatusConflict, fmt.Sprintf("Revoking ungranted role %s for user %s", r, n.User))
plog.Noticef("revoking ungranted role %s for user %s", r, nu.User)
return User{}, authErr(http.StatusConflict, fmt.Sprintf("Revoking ungranted role %s for user %s", r, nu.User))
}
currentRoles.Remove(r)
}

View File

@ -26,6 +26,21 @@ import (
etcdstore "github.com/coreos/etcd/store"
)
type fakeDoer struct{}
func (_ fakeDoer) Do(context.Context, etcdserverpb.Request) (etcdserver.Response, error) {
return etcdserver.Response{}, nil
}
func TestCheckPassword(t *testing.T) {
st := NewStore(fakeDoer{}, 5*time.Second)
u := User{Password: "$2a$10$I3iddh1D..EIOXXQtsra4u8AjOtgEa2ERxVvYGfXFBJDo1omXwP.q"}
matched := st.CheckPassword(u, "foo")
if matched {
t.Fatalf("expected false, got %v", matched)
}
}
const testTimeout = time.Millisecond
func TestMergeUser(t *testing.T) {
@ -71,16 +86,16 @@ func TestMergeUser(t *testing.T) {
User{User: "foo", Roles: []string{"role1", "role2"}},
false,
},
{
User{User: "foo"},
User{User: "foo", Password: "$2a$10$aUPOdbOGNawaVSusg3g2wuC3AH6XxIr9/Ms4VgDvzrAVOJPYzZILa"},
User{User: "foo", Roles: []string{}, Password: "$2a$10$aUPOdbOGNawaVSusg3g2wuC3AH6XxIr9/Ms4VgDvzrAVOJPYzZILa"},
{ // empty password will not overwrite the previous password
User{User: "foo", Password: "foo", Roles: []string{}},
User{User: "foo", Password: ""},
User{User: "foo", Password: "foo", Roles: []string{}},
false,
},
}
for i, tt := range tbl {
out, err := tt.input.merge(tt.merge)
out, err := tt.input.merge(tt.merge, passwordStore{})
if err != nil && !tt.iserr {
t.Fatalf("Got unexpected error on item %d", i)
}

View File

@ -71,6 +71,7 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
if logerr {
plog.Warningf("could not read the body of cluster response: %v", err)

View File

@ -285,6 +285,10 @@ type userWithRoles struct {
Roles []auth.Role `json:"roles,omitempty"`
}
type usersCollections struct {
Users []userWithRoles `json:"users"`
}
func (sh *authHandler) baseUsers(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
@ -311,9 +315,7 @@ func (sh *authHandler) baseUsers(w http.ResponseWriter, r *http.Request) {
return
}
var usersCollections struct {
Users []userWithRoles `json:"users"`
}
ucs := usersCollections{}
for _, userName := range users {
var user auth.User
user, err = sh.sec.GetUser(userName)
@ -327,15 +329,14 @@ func (sh *authHandler) baseUsers(w http.ResponseWriter, r *http.Request) {
var role auth.Role
role, err = sh.sec.GetRole(roleName)
if err != nil {
writeError(w, r, err)
return
continue
}
uwr.Roles = append(uwr.Roles, role)
}
usersCollections.Users = append(usersCollections.Users, uwr)
ucs.Users = append(ucs.Users, uwr)
}
err = json.NewEncoder(w).Encode(usersCollections)
err = json.NewEncoder(w).Encode(ucs)
if err != nil {
plog.Warningf("baseUsers error encoding on %s", r.URL)

View File

@ -15,10 +15,14 @@
package etcdhttp
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"path"
"sort"
"strings"
"testing"
@ -43,7 +47,14 @@ type mockAuthStore struct {
enabled bool
}
func (s *mockAuthStore) AllUsers() ([]string, error) { return []string{"alice", "bob", "root"}, s.err }
func (s *mockAuthStore) AllUsers() ([]string, error) {
var us []string
for u := range s.users {
us = append(us, u)
}
sort.Strings(us)
return us, s.err
}
func (s *mockAuthStore) GetUser(name string) (auth.User, error) {
u, ok := s.users[name]
if !ok {
@ -52,11 +63,12 @@ func (s *mockAuthStore) GetUser(name string) (auth.User, error) {
return *u, s.err
}
func (s *mockAuthStore) CreateOrUpdateUser(user auth.User) (out auth.User, created bool, err error) {
var u auth.User
if s.users == nil {
u, err := s.CreateUser(user)
u, err = s.CreateUser(user)
return u, true, err
}
u, err := s.UpdateUser(user)
u, err = s.UpdateUser(user)
return u, false, err
}
func (s *mockAuthStore) CreateUser(user auth.User) (auth.User, error) { return user, s.err }
@ -67,9 +79,15 @@ func (s *mockAuthStore) UpdateUser(user auth.User) (auth.User, error) {
func (s *mockAuthStore) AllRoles() ([]string, error) {
return []string{"awesome", "guest", "root"}, s.err
}
func (s *mockAuthStore) GetRole(name string) (auth.Role, error) { return *s.roles[name], s.err }
func (s *mockAuthStore) CreateRole(role auth.Role) error { return s.err }
func (s *mockAuthStore) DeleteRole(name string) error { return s.err }
func (s *mockAuthStore) GetRole(name string) (auth.Role, error) {
r, ok := s.roles[name]
if ok {
return *r, s.err
}
return auth.Role{}, fmt.Errorf("%q does not exist (%v)", name, s.err)
}
func (s *mockAuthStore) CreateRole(role auth.Role) error { return s.err }
func (s *mockAuthStore) DeleteRole(name string) error { return s.err }
func (s *mockAuthStore) UpdateRole(role auth.Role) (auth.Role, error) {
return *s.roles[role.Role], s.err
}
@ -361,6 +379,61 @@ func TestAuthFlow(t *testing.T) {
}
}
func TestGetUserGrantedWithNonexistingRole(t *testing.T) {
sh := &authHandler{
sec: &mockAuthStore{
users: map[string]*auth.User{
"root": {
User: "root",
Roles: []string{"root", "foo"},
},
},
roles: map[string]*auth.Role{
"root": {
Role: "root",
},
},
},
cluster: &fakeCluster{id: 1},
}
srv := httptest.NewServer(http.HandlerFunc(sh.baseUsers))
defer srv.Close()
req, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
req.URL, err = url.Parse(srv.URL)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
cli := http.DefaultClient
resp, err := cli.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
var uc usersCollections
if err := json.NewDecoder(resp.Body).Decode(&uc); err != nil {
t.Fatal(err)
}
if len(uc.Users) != 1 {
t.Fatalf("expected 1 user, got %+v", uc.Users)
}
if uc.Users[0].User != "root" {
t.Fatalf("expected 'root', got %q", uc.Users[0].User)
}
if len(uc.Users[0].Roles) != 1 {
t.Fatalf("expected 1 role, got %+v", uc.Users[0].Roles)
}
if uc.Users[0].Roles[0].Role != "root" {
t.Fatalf("expected 'root', got %q", uc.Users[0].Roles[0].Role)
}
}
func mustAuthRequest(method, username, password string) *http.Request {
req, err := http.NewRequest(method, "path", strings.NewReader(""))
if err != nil {

View File

@ -158,7 +158,12 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
}
r.start(&EtcdServer{r: r})
r.start(&EtcdServer{r: raftNode{
Node: r.Node,
storage: r.storage,
raftStorage: r.raftStorage,
transport: r.transport,
}})
n.readyc <- raft.Ready{}
select {
case <-r.applyc:

View File

@ -203,6 +203,10 @@ type EtcdServer struct {
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
// wg is used to wait for the go routines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -515,9 +519,15 @@ func (s *EtcdServer) run() {
}
defer func() {
s.r.stop()
sched.Stop()
// wait for snapshots before closing raft so wal stays open
s.wg.Wait()
// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
// by adding a peer after raft stops the transport
s.r.stop()
// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
if s.lessor != nil {
@ -1162,7 +1172,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.store.Clone()
s.wg.Add(1)
go func() {
defer s.wg.Done()
d, err := clone.SaveNoCopy()
// TODO: current store will never fail to do a snapshot
// what should we do if the store might fail?

View File

@ -8,6 +8,7 @@ all: cfssl ca req
cfssl:
go get -u -tags nopkcs11 github.com/cloudflare/cfssl/cmd/cfssl
go get -u github.com/cloudflare/cfssl/cmd/cfssljson
go get -u github.com/mattn/goreman
ca:
mkdir -p certs

View File

@ -231,9 +231,14 @@ func TestIssue2681(t *testing.T) {
}
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
func TestIssue2746(t *testing.T) { testIssue2746(t, 5) }
// With 3 nodes TestIssue2476 sometimes had a shutdown with an inflight snapshot.
func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) }
func testIssue2746(t *testing.T, members int) {
defer testutil.AfterTest(t)
c := NewCluster(t, 5)
c := NewCluster(t, members)
for _, m := range c.Members {
m.SnapCount = 10
@ -247,7 +252,7 @@ func TestIssue2746(t *testing.T) {
clusterMustProgress(t, c.Members)
}
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
c.RemoveMember(t, uint64(c.Members[members-1].s.ID()))
c.waitLeader(t, c.Members)
c.AddMember(t)

View File

@ -111,9 +111,10 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
closeNotifier, ok := rw.(http.CloseNotifier)
cancel := httputil.RequestCanceler(p.transport, proxyreq)
if ok {
closeCh := closeNotifier.CloseNotify()
go func() {
select {
case <-closeNotifier.CloseNotify():
case <-closeCh:
atomic.StoreInt32(&requestClosed, 1)
log.Printf("proxy: client %v closed request prematurely", clientreq.RemoteAddr)
cancel()

View File

@ -837,6 +837,12 @@ func (r *raft) addNode(id uint64) {
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 {
return
}
// The quorum size is now smaller, so see if any pending entries can
// be committed.
if r.maybeCommit() {

View File

@ -1780,6 +1780,13 @@ func TestRemoveNode(t *testing.T) {
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
r.removeNode(1)
w = []uint64{}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
func TestPromotable(t *testing.T) {

View File

@ -142,11 +142,23 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
go func() {
for {
select {
case mm := <-p.propc:
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case mm := <-p.recvc:
case <-p.stopc:
return
}
}
}()
// r.Process might block for processing proposal when there is no leader.
// Thus propc must be put into a separate routine with recvc to avoid blocking
// processing other raft messages.
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}

View File

@ -26,6 +26,7 @@ import (
"strings"
"time"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -78,9 +79,14 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = ioutil.WriteFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
}
}
return err
}

View File

@ -30,6 +30,7 @@ type Event struct {
Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"-"`
Refresh bool `json:"refresh,omitempty"`
}
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
@ -64,3 +65,7 @@ func (e *Event) Clone() *Event {
PrevNode: e.PrevNode.Clone(),
}
}
func (e *Event) SetRefresh() {
e.Refresh = true
}

View File

@ -78,24 +78,26 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
for {
e := eh.Queue.Events[i]
ok := (e.Node.Key == key)
if !e.Refresh {
ok := (e.Node.Key == key)
if recursive {
// add tailing slash
key = path.Clean(key)
if key[len(key)-1] != '/' {
key = key + "/"
if recursive {
// add tailing slash
key = path.Clean(key)
if key[len(key)-1] != '/' {
key = key + "/"
}
ok = ok || strings.HasPrefix(e.Node.Key, key)
}
ok = ok || strings.HasPrefix(e.Node.Key, key)
}
if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
}
if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
}
if ok {
return e, nil
if ok {
return e, nil
}
}
i = (i + 1) % eh.Queue.Capacity

View File

@ -236,6 +236,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptio
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}
return e, nil
@ -295,6 +298,10 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
return nil, err
}
if expireOpts.Refresh {
value = n.Value
}
// update etcd index
s.CurrentIndex++
@ -314,6 +321,9 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}
return e, nil
@ -539,6 +549,9 @@ func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}
s.CurrentIndex = nextIndex

View File

@ -115,6 +115,10 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
return w, nil
}
func (wh *watcherHub) add(e *Event) {
e = wh.EventHistory.addEvent(e)
}
// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory

40
test
View File

@ -73,33 +73,21 @@ function fmt_tests {
exit 255
fi
echo "Checking govet..."
vetRes=$(go vet $TEST)
if [ -n "${vetRes}" ]; then
echo -e "govet checking failed:\n${vetRes}"
exit 255
fi
# echo "Checking govet..."
# vetRes=$(go vet $TEST)
# if [ -n "${vetRes}" ]; then
# echo -e "govet checking failed:\n${vetRes}"
# exit 255
# fi
echo "Checking govet -shadow..."
for path in $FMT; do
vetRes=$(go tool vet -shadow ${path})
if [ -n "${vetRes}" ]; then
echo -e "govet checking ${path} failed:\n${vetRes}"
exit 255
fi
done
echo "Checking goword..."
# get all go files to process
gofiles=`find $FMT -iname '*.go' 2>/dev/null`
# ignore tests and protobuf files
gofiles=`echo ${gofiles} | sort | uniq | sed "s/ /\n/g" | egrep -v "(\\_test.go|\\.pb\\.go)"`
# only check for broken exported godocs
gowordRes=`goword -use-spell=false ${gofiles} | grep godoc-export | sort`
if [ ! -z "$gowordRes" ]; then
echo -e "goword checking failed:\n${gowordRes}"
exit 255
fi
# echo "Checking govet -shadow..."
# for path in $FMT; do
# vetRes=$(go tool vet -shadow ${path})
# if [ -n "${vetRes}" ]; then
# echo -e "govet checking ${path} failed:\n${vetRes}"
# exit 255
# fi
# done
echo "Checking for license header..."
licRes=$(for file in $(find . -type f -iname '*.go' ! -path './Godeps/*'); do

View File

@ -1,23 +0,0 @@
# etcd-top
etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
usage:
```
-iface="eth0": interface for sniffing traffic on
-period=1: seconds between submissions
-ports="2379,4001": etcd listening ports
-promiscuous=true: whether to perform promiscuous sniffing or not.
-topk=10: submit stats for the top <K> sniffed paths
```
result:
```
go run etcd-top.go --period=1 -topk=3
1440035702 sniffed 1074 requests over last 1 seconds
Top 3 most popular http requests:
Sum Rate Verb Path
1305 22 GET /v2/keys/c
1302 8 GET /v2/keys/S
1297 10 GET /v2/keys/h
```

View File

@ -1,229 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"math"
"net/http"
"os"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto"
)
type nameSum struct {
Name string
Sum float64
Rate float64
}
type nameSums []nameSum
func (n nameSums) Len() int {
return len(n)
}
func (n nameSums) Less(i, j int) bool {
return n[i].Sum > n[j].Sum
}
func (n nameSums) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// This function listens for periodic metrics from the loghisto metric system,
// and upon receipt of a batch of them it will print out the desired topK.
func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
for m := range metricStream {
requestCounter := float64(0)
nvs := nameSums{}
for k, v := range m.Metrics {
// loghisto adds _rate suffixed metrics for counters and histograms
if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
continue
}
nvs = append(nvs, nameSum{
Name: k,
Sum: v,
Rate: m.Metrics[k+"_rate"],
})
requestCounter += m.Metrics[k+"_rate"]
}
fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
uint(requestCounter), period)
if len(nvs) == 0 {
continue
}
sort.Sort(nvs)
fmt.Printf("Top %d most popular http requests:\n", topK)
fmt.Println("Total Sum Period Sum Verb Path")
for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
}
}
}
// packetDecoder decodes packets and hands them off to the streamRouter
func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
for pkt := range packetsIn {
pkt.Decode()
select {
case packetsOut <- pkt:
default:
fmt.Fprint(os.Stderr, "shedding at decoder!")
}
}
}
// processor tries to parse an http request from each packet, and if
// successful it records metrics about it in the loghisto metric system.
func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
for pkt := range packetsIn {
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
if reqErr == nil {
ms.Counter(req.Method+" "+req.URL.Path, 1)
}
}
}
// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
// and responses for this particular TCP connection. This allows the processor to own a local map
// of requests so that it can avoid coordinating with other goroutines to perform analysis.
func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
for pkt := range parsedPackets {
if pkt.TCP == nil {
continue
}
clientPort := uint16(0)
for _, p := range ports {
if pkt.TCP.SrcPort == p {
clientPort = pkt.TCP.DestPort
break
}
if pkt.TCP.DestPort == p {
clientPort = pkt.TCP.SrcPort
break
}
}
if clientPort != 0 {
// client Port can be assumed to have sufficient entropy for
// distribution among processors, and we want the same
// tcp stream to go to the same processor every time
// so that if we do proper packet reconstruction it will
// be easier.
select {
case processors[int(clientPort)%len(processors)] <- pkt:
default:
fmt.Fprint(os.Stderr, "Shedding load at router!")
}
}
}
}
// 1. parse args
// 2. start the loghisto metric system
// 3. start the processing and printing goroutines
// 4. open the pcap handler
// 5. hand off packets from the handler to the decoder
func main() {
portsArg := flag.String("ports", "2379,4001", "etcd listening ports")
iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
promisc := flag.Bool("promiscuous", true, "promiscuous mode")
period := flag.Uint("period", 1, "seconds between submissions")
topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
flag.Parse()
numCPU := runtime.NumCPU()
runtime.GOMAXPROCS(numCPU)
ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
ms.Start()
metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
ms.SubscribeToProcessedMetrics(metricStream)
defer ms.UnsubscribeFromProcessedMetrics(metricStream)
go statPrinter(metricStream, *topK, *period)
ports := []uint16{}
for _, p := range strings.Split(*portsArg, ",") {
port, err := strconv.Atoi(p)
if err == nil {
ports = append(ports, uint16(port))
} else {
fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
os.Exit(1)
}
}
if len(ports) == 0 {
fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
os.Exit(1)
}
// We choose 1518 for the snaplen because it's the default
// ethernet MTU at the link layer. We choose 1000 for the
// timeout based on a measurement for its impact on latency
// impact, but it is less precise.
h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
defer h.Close()
portArray := strings.Split(*portsArg, ",")
dst := strings.Join(portArray, " or dst port ")
src := strings.Join(portArray, " or src port ")
filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
fmt.Println("using bpf filter: ", filter)
if err := h.Setfilter(filter); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
unparsedPackets := make(chan *pcap.Packet, 16384)
parsedPackets := make(chan *pcap.Packet, 16384)
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
go packetDecoder(unparsedPackets, parsedPackets)
}
processors := []chan *pcap.Packet{}
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
p := make(chan *pcap.Packet, 16384)
processors = append(processors, p)
go processor(ms, p)
}
go streamRouter(ports, parsedPackets, processors)
for {
pkt := h.Next()
if pkt != nil {
select {
case unparsedPackets <- pkt:
default:
fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
}
}
}
}

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.2.0"
Version = "2.3.0"
Version = "2.3.8"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"