Compare commits

...

67 Commits

Author SHA1 Message Date
41e52ebc22 version: bump to 3.1.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-22 09:46:23 -07:00
7bb538d4d4 backend: add FillPercent option 2017-03-21 12:12:32 -07:00
1622782e49 integration: ensure 'StopNotify' on publish error
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:12:13 -07:00
99b47e0c1e etcdmain: handle StopNotify when ErrStopped aborted publish
Fix https://github.com/coreos/etcd/issues/7512.

If a server starts and aborts due to config error,
it is possible to get stuck in ReadyNotify waits.
This adds select case to get notified on stop channel.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-21 12:10:36 -07:00
350d0cd211 ctlv3: have "protobuf" in output help string instead of "proto"
Fixes #7538
2017-03-20 12:40:25 -07:00
72f37ff79a embed: Clear default initial cluster
NewConfig() should sets initial cluster from name but we should clear it
in the event that another discovery option has been specified.

Fixes #7516
2017-03-18 07:56:18 -07:00
3221454cab etcdserver: remove possibly compacted entry look-up
Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes) slow, it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:18 -07:00
4a1bffdbc6 clientv3: close open watch channel if substream is closing on reconnect
If substream is closing but outc is still open while reconnecting, then outc
would only be closed once the watch client would connect or once the watch
client is closed. This was leading to deadlocks in the proxy tests. Instead,
close immediately if the context is canceled.

Fixes #7503
2017-03-18 07:56:18 -07:00
9d9be2bc86 ctlv3: ensure synced member list before printing env vars on member add
In cases of multiple endpoints, it's possible member add would get a its
member list from a member that has not yet recognized the membership
update. Instead, confirm that the member list response is from the
member that acked the member add or from a member that has synced
with the cluster following the member add.

Fixes #7498
2017-03-18 07:56:18 -07:00
e5462f74f1 auth: get rid of deadlocking channel passing scheme in simpleTokenTTL
Cherry-picked from 1b1fabef8f.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-18 07:56:05 -07:00
c68c1d9344 discovery: fix print format
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:57 -07:00
6ed56cd723 auth: nil check AuthInfo when checking admin permissions
If the context does not include auth information, get authinfo will
return a nil auth info and a nil error. This is then passed to
IsAdminPermitted, which would dereference the nil auth info.
2017-03-17 14:21:39 -07:00
a3c6f6bf81 version: bump up to 3.1.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-17 14:21:15 -07:00
21fdcc6443 version: bump up to 3.1.3
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-10 09:05:16 -08:00
8d122e7011 etcdmain: SdNotify when gateway, grpc-proxy are ready
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-09 11:35:20 -08:00
ade1d97893 lease: guard 'Lease.itemSet' from concurrent writes
Fix https://github.com/coreos/etcd/issues/7448.

Affected if etcd builds with Go 1.8+.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-08 14:50:06 -08:00
1300189581 gateway: fix the dns discovery method
strip the scheme from the endpoints to have a clean hostname for TCP proxy

Fixes #7452
2017-03-08 14:49:50 -08:00
1971517806 etcdctl: correctly batch revisions in make-mirror
Fixes #7410
2017-03-06 14:55:47 -08:00
d614bb0799 etcdmain: log machine default host after update check
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:31 -08:00
059dc91d4c embed: use machine default host only for default value, 0.0.0.0
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-03-06 14:55:24 -08:00
5fdbaee761 version: bump up to 3.1.2+git 2017-02-24 10:34:53 -08:00
714e7ec8db version: bump up to 3.1.2 2017-02-22 10:45:48 -08:00
2cdaf6d661 netutil: use ipv4 host by default
Was non-deterministic.
2017-02-22 10:45:38 -08:00
77a51e0dbf pkg/netutil: name GetDefaultInterfaces consistent 2017-02-22 10:45:29 -08:00
d96d3aa0ed netutil: add dualstack to linux_route
in v3.1.0 netutil couldn't get default interface for ipv6only hosts

Fixes #7219
2017-02-22 10:45:19 -08:00
66e7532f57 pkg/netutil: use native byte ordering for route information
Fixes #7199
2017-02-22 10:45:07 -08:00
3eff360e79 pkg/cpuutil: add cpuutil
A package for unsafe cpu-ish things.
2017-02-22 10:44:59 -08:00
1487071966 integration: add 'TestV3HashRestart' 2017-02-22 10:39:49 -08:00
5d62bba9c7 auth: keep old revision in 'NewAuthStore'
When there's no changes yet (right after auth
store initialization), we should commit old revision.

Fix https://github.com/coreos/etcd/issues/7359.
2017-02-22 10:39:28 -08:00
114e293119 integration: test keepalives for short TTLs 2017-02-22 10:38:38 -08:00
1439955536 clientv3: do not set next keepalive time <= now+TTL 2017-02-22 10:38:28 -08:00
2c8ecc7e13 tcpproxy: don't use range variable in reactivate goroutine
Ends up trying to reactivate only the last endpoint.
2017-02-22 10:38:19 -08:00
7b4d622a7e raft: fix read index request for #7331 2017-02-22 10:38:09 -08:00
db8abbd975 version: bump to 3.1.1+git 2017-02-21 17:00:02 -08:00
ac1c7eba21 version: bump up to 3.1.1 2017-02-16 13:53:25 -08:00
9cc6d4852a travis: update for Go 1.7.5 tests 2017-02-16 13:53:05 -08:00
ff7fa9843d clientv3: fix lease keepalive duration 2017-02-16 13:52:27 -08:00
f66138d403 clientv3: fix lease keepalive duration 2017-02-16 12:41:09 -08:00
8c87916f68 auth: add 'setupAuthStore' to tests 2017-02-14 14:39:51 -08:00
9e81b002c4 auth: correct initialization in NewAuthStore()
Because of my own silly mistake, current NewAuthStore() doesn't
initialize authStore in a correct manner. For example, after recovery
from snapshot, it cannot revive the flag of enabled/disabled. This
commit fixes the problem.

Fix https://github.com/coreos/etcd/issues/7165
2017-02-14 13:49:19 -08:00
4962c5cff7 auth: add a test case for recoverying from snapshot
Conflicts:
	auth/store_test.go
2017-02-14 13:48:17 -08:00
e5bf25a3b6 e2e: add cases for defrag and snapshot with authentication 2017-02-14 13:43:11 -08:00
98c60e8faa auth, etcdserver: let maintenance services require root role
This commit lets maintenance services require root privilege. It also
moves AuthInfoFromCtx() from etcdserver to auth pkg for cleaning purpose.
2017-02-14 13:42:31 -08:00
3ac3fa6f3d travis: disable email notifications
Was spamming security@coreos.com
2017-02-14 12:55:02 -08:00
eaa8b9e155 clientv3: test closing client cancels blocking dials 2017-02-14 11:31:51 -08:00
ea2aae464d clientv3: use DialContext
Fixes #7216
2017-02-14 11:31:43 -08:00
776739ebc2 roadmap: update roadmap 2017-01-20 14:21:08 -08:00
a7a8a47ba0 README: remove ACI, update Go version 2017-01-20 14:21:00 -08:00
379f7ae10e op-guide: change grpc-proxy from 'pre' to alpha' 2017-01-20 13:25:21 -08:00
ead2d95914 version: bump to v3.1.0+git 2017-01-20 13:25:04 -08:00
8ba2897a21 version: bump to v3.1.0 2017-01-20 12:42:12 -08:00
bc31e27cb9 documentation: update build documentation 2017-01-20 11:20:54 -08:00
fce20a0b0b test: passed the test script arguments as the test function parameters 2017-01-20 11:15:01 -08:00
f10363fecd etcdctlv3: snapshot restore works with lease key 2017-01-20 10:06:49 -08:00
a7ec6c88fd pkg/flags: fixed prefix checking of the env variables 2017-01-20 09:57:00 -08:00
62c591d223 integration: test STM apply on concurrent deletion 2017-01-20 09:56:42 -08:00
5676226867 clientv3/concurrency: fix rev comparison on concurrent key deletion 2017-01-20 09:56:36 -08:00
898b9e608f Documentation: fix typo s/endpoint-health/endpoint health/ 2017-01-19 20:48:01 -08:00
b84be6b11b NEWS: fix date for v3.1 release 2017-01-19 17:56:35 -08:00
7a12d65528 Documentation: update experimental_apis for v3.1 release 2017-01-19 12:43:32 -08:00
53ac04b118 vendor: update 'golang.org/x/net' 2017-01-18 13:11:48 -08:00
fbcd5375b3 glide: update 'golang.org/x/net' 2017-01-18 13:11:13 -08:00
c2e8d06eec grpcproxy, etcdmain, integration: add close channel to kv proxy
ccache launches goroutines that need to be explicitly stopped.

Fixes #7158
2017-01-18 13:11:03 -08:00
6c8f1986c8 etcdserver: use ReqTimeout for linearized read
Fixes #7136
2017-01-17 17:31:31 -08:00
be9ae300c6 pkg/report: add nil checking for getTimeSeries 2017-01-17 13:23:57 -08:00
9ba3632614 Documentation: document upgrading to v3.1 2017-01-17 13:23:19 -08:00
c2d8b5a9e8 ctlv3: print cluster info after adding new member 2017-01-17 10:23:02 -08:00
99 changed files with 5584 additions and 1719 deletions

View File

@ -4,8 +4,11 @@ go_import_path: github.com/coreos/etcd
sudo: false
go:
- 1.7.4
- tip
- 1.7.5
notifications:
on_success: never
on_failure: never
env:
matrix:

View File

@ -1,8 +1,11 @@
# Experimental APIs and features
For the most part, the etcd project is stable, but we are still moving fast! We believe in the release fast philosophy. We want to get early feedback on features still in development and stabilizing. Thus, there are, and will be more, experimental features and APIs. We plan to improve these features based on the early feedback from the community, or abandon them if there is little interest, in the next few releases. If you are running a production system, please do not rely on any experimental features or APIs.
For the most part, the etcd project is stable, but we are still moving fast! We believe in the release fast philosophy. We want to get early feedback on features still in development and stabilizing. Thus, there are, and will be more, experimental features and APIs. We plan to improve these features based on the early feedback from the community, or abandon them if there is little interest, in the next few releases. Please do not rely on any experimental features or APIs in production environment.
## The current experimental API/features are:
- v3 auth API: expect to be stable in 3.1 release
- etcd gateway: expect to be stable in 3.1 release
- [gateway][gateway]: beta, to be stable in 3.2 release
- [gRPC proxy][grpc-proxy]: alpha, to be stable in 3.2 release
[gateway]: ../op-guide/gateway.md
[grpc-proxy]: ../op-guide/grpc_proxy.md

View File

@ -10,29 +10,44 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
## Build the latest version
For those wanting to try the very latest version, build etcd from the `master` branch.
[Go](https://golang.org/) version 1.6+ (with HTTP2 support) is required to build the latest version of etcd.
etcd vendors its dependency for official release binaries, while making vendoring optional to avoid import conflicts.
[`build` script][build-script] would automatically include the vendored dependencies from [`cmd`][cmd-directory] directory.
For those wanting to try the very latest version, build etcd from the `master` branch. [Go](https://golang.org/) version 1.7+ is required to build the latest version of etcd. To ensure etcd is built against well-tested libraries, etcd vendors its dependencies for official release binaries. However, etcd's vendoring is also optional to avoid potential import conflicts when embedding the etcd server or using the etcd client.
Here are the commands to build an etcd binary from the `master` branch:
First, confirm go 1.7+ is installed:
```
```sh
# go is required
$ go version
go version go1.6 darwin/amd64
go version go1.7.3 darwin/amd64
# GOPATH should be set correctly
$ echo $GOPATH
/Users/example/go
```
$ mkdir -p $GOPATH/src/github.com/coreos
$ cd $GOPATH/src/github.com/coreos
To build `etcd` from the `master` branch without a `GOPATH` using the official `build` script:
```sh
$ git clone https://github.com/coreos/etcd.git
$ cd etcd
$ ./build
$ ./bin/etcd
...
```
To build a vendored `etcd` from the `master` branch via `go get`:
```sh
# GOPATH should be set
$ echo $GOPATH
/Users/example/go
$ go get github.com/coreos/etcd/cmd/etcd
$ $GOPATH/bin/etcd
```
To build `etcd` from the `master` branch without vendoring (may not build due to upstream conflicts):
```sh
# GOPATH should be set
$ echo $GOPATH
/Users/example/go
$ go get github.com/coreos/etcd
$ $GOPATH/bin/etcd
```
## Test the installation

View File

@ -52,6 +52,7 @@ To learn more about the concepts and internals behind etcd, read the following p
- [Migrate applications from using API v2 to API v3][v2_migration]
- [Updating v2.3 to v3.0][v3_upgrade]
- [Updating v3.0 to v3.1][v31_upgrade]
## Frequently Asked Questions (FAQ)
@ -88,3 +89,4 @@ Answers to [common questions] about etcd.
[supported_platform]: op-guide/supported-platform.md
[experimental]: dev-guide/experimental_apis.md
[v3_upgrade]: upgrades/upgrade_3_0.md
[v31_upgrade]: upgrades/upgrade_3_1.md

View File

@ -57,7 +57,7 @@ sudo rkt run --net=default:IP=${NODE3} coreos.com/etcd:v3.0.6 -- -name=node3 -ad
Verify the cluster is healthy and can be reached.
```
ETCDCTL_API=3 etcdctl --endpoints=http://172.16.28.21:2379,http://172.16.28.22:2379,http://172.16.28.23:2379 endpoint-health
ETCDCTL_API=3 etcdctl --endpoints=http://172.16.28.21:2379,http://172.16.28.22:2379,http://172.16.28.23:2379 endpoint health
```
### DNS

View File

@ -1,6 +1,6 @@
# gRPC proxy
*This is a pre-alpha feature, we are looking for early feedback.*
*This is an alpha feature, we are looking for early feedback.*
The gRPC proxy is a stateless etcd reverse proxy operating at the gRPC layer (L7). The proxy is designed to reduce the total processing load on the core etcd cluster. For horizontal scalability, it coalesces watch and lease API requests. To protect the cluster against abusive clients, it caches key range requests.

View File

@ -6,27 +6,27 @@ In the general case, upgrading from etcd 2.3 to 3.0 can be a zero-downtime, roll
Before [starting an upgrade](#upgrade-procedure), read through the rest of this guide to prepare.
### Upgrade Checklists
### Upgrade checklists
#### Upgrade Requirements
#### Upgrade requirements
To upgrade an existing etcd deployment to 3.0, the running cluster must be 2.3 or greater. If it's before 2.3, please upgrade to [2.3](https://github.com/coreos/etcd/releases/tag/v2.3.0) before upgrading to 3.0.
Also, to ensure a smooth rolling upgrade, the running cluster must be healthy. You can check the health of the cluster by using the `etcdctl cluster-health` command.
Also, to ensure a smooth rolling upgrade, the running cluster must be healthy. Check the health of the cluster by using the `etcdctl cluster-health` command before proceeding.
#### Preparation
Before upgrading etcd, always test the services relying on etcd in a staging environment before deploying the upgrade to the production environment.
Before beginning, [backup the etcd data directory](../v2/admin_guide.md#backing-up-the-datastore). Should something go wrong with the upgrade, it is possible to use this backup to [downgrade](#downgrade) back to existing etcd version.
Before beginning, [backup the etcd data directory](../v2/admin_guide.md#backing-up-the-datastore). Should something go wrong with the upgrade, it is possible to use this backup to [downgrade](#downgrade) back to existing etcd version.
#### Mixed Versions
#### Mixed versions
While upgrading, an etcd cluster supports mixed versions of etcd members, and operates with the protocol of the lowest common version. The cluster is only considered upgraded once all of its members are upgraded to version 3.0. Internally, etcd members negotiate with each other to determine the overall cluster version, which controls the reported version and the supported features.
#### Limitations
It might take up to 2 minutes for the newly upgraded member to catch up with the existing cluster when the total data size is larger than 50MB. Check the size of a recent snapshot to estimate the total data size. In other words, it is safest to wait for 2 minutes between upgrading each member.
It might take up to 2 minutes for the newly upgraded member to catch up with the existing cluster when the total data size is larger than 50MB. Check the size of a recent snapshot to estimate the total data size. In other words, it is safest to wait for 2 minutes between upgrading each member.
For a much larger total data size, 100MB or more , this one-time process might take even more time. Administrators of very large etcd clusters of this magnitude can feel free to contact the [etcd team][etcd-contact] before upgrading, and well be happy to provide advice on the procedure.
@ -36,13 +36,13 @@ If all members have been upgraded to v3.0, the cluster will be upgraded to v3.0,
Please [backup the data directory](../v2/admin_guide.md#backing-up-the-datastore) of all etcd members to make downgrading the cluster possible even after it has been completely upgraded.
### Upgrade Procedure
### Upgrade procedure
This example details the upgrade of a three-member v2.3 ectd cluster running on a local machine.
This example details the upgrade of a three-member v2.3 ectd cluster running on a local machine.
#### 1. Check upgrade requirements.
Is the the cluster healthy and running v.2.3.x?
Is the cluster healthy and running v.2.3.x?
```
$ etcdctl cluster-health
@ -64,7 +64,7 @@ When each etcd process is stopped, expected errors will be logged by other clust
2016-06-27 15:21:48.624175 I | rafthttp: the connection with 8211f1d0f64f3269 became inactive
```
Its a good idea at this point to [backup the etcd data directory](../v2/admin_guide.md#backing-up-the-datastore) to provide a downgrade path should any problems occur:
Its a good idea at this point to [backup the etcd data directory](../v2/admin_guide.md#backing-up-the-datastore) to provide a downgrade path should any problems occur:
```
$ etcdctl backup \
@ -102,7 +102,7 @@ Upgraded members will log warnings like the following until the entire cluster i
#### 5. Finish
When all members are upgraded, the cluster will report upgrading to 3.0 successfully:
When all members are upgraded, the cluster will report upgrading to 3.0 successfully:
```
2016-06-27 15:22:19.873751 N | membership: updated the cluster version from 2.3 to 3.0

View File

@ -0,0 +1,123 @@
## Upgrade etcd from 3.0 to 3.1
In the general case, upgrading from etcd 3.0 to 3.1 can be a zero-downtime, rolling upgrade:
- one by one, stop the etcd v3.0 processes and replace them with etcd v3.1 processes
- after running all v3.1 processes, new features in v3.1 are available to the cluster
Before [starting an upgrade](#upgrade-procedure), read through the rest of this guide to prepare.
### Upgrade checklists
#### Upgrade requirements
To upgrade an existing etcd deployment to 3.1, the running cluster must be 3.0 or greater. If it's before 3.0, please upgrade to [3.0](https://github.com/coreos/etcd/releases/tag/v3.0.16) before upgrading to 3.1.
Also, to ensure a smooth rolling upgrade, the running cluster must be healthy. Check the health of the cluster by using the `etcdctl endpoint health` command before proceeding.
#### Preparation
Before upgrading etcd, always test the services relying on etcd in a staging environment before deploying the upgrade to the production environment.
Before beginning, [backup the etcd data](../op-guide/maintenance.md#snapshot-backup). Should something go wrong with the upgrade, it is possible to use this backup to [downgrade](#downgrade) back to existing etcd version. Please note that the `snapshot` command only backs up the v3 data. For v2 data, see [backing up v2 datastore](../v2/admin_guide.md#backing-up-the-datastore).
#### Mixed versions
While upgrading, an etcd cluster supports mixed versions of etcd members, and operates with the protocol of the lowest common version. The cluster is only considered upgraded once all of its members are upgraded to version 3.1. Internally, etcd members negotiate with each other to determine the overall cluster version, which controls the reported version and the supported features.
#### Limitations
Note: If the cluster only has v3 data and no v2 data, it is not subject to this limitation.
If the cluster is serving a v2 data set larger than 50MB, each newly upgraded member may take up to two minutes to catch up with the existing cluster. Check the size of a recent snapshot to estimate the total data size. In other words, it is safest to wait for 2 minutes between upgrading each member.
For a much larger total data size, 100MB or more , this one-time process might take even more time. Administrators of very large etcd clusters of this magnitude can feel free to contact the [etcd team][etcd-contact] before upgrading, and we'll be happy to provide advice on the procedure.
#### Downgrade
If all members have been upgraded to v3.1, the cluster will be upgraded to v3.1, and downgrade from this completed state is **not possible**. If any single member is still v3.0, however, the cluster and its operations remains "v3.0", and it is possible from this mixed cluster state to return to using a v3.0 etcd binary on all members.
Please [backup the data directory](../op-guide/maintenance.md#snapshot-backup) of all etcd members to make downgrading the cluster possible even after it has been completely upgraded.
### Upgrade procedure
This example shows how to upgrade a 3-member v3.0 ectd cluster running on a local machine.
#### 1. Check upgrade requirements
Is the cluster healthy and running v3.0.x?
```
$ ETCDCTL_API=3 etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
localhost:2379 is healthy: successfully committed proposal: took = 6.600684ms
localhost:22379 is healthy: successfully committed proposal: took = 8.540064ms
localhost:32379 is healthy: successfully committed proposal: took = 8.763432ms
$ curl http://localhost:2379/version
{"etcdserver":"3.0.16","etcdcluster":"3.0.0"}
```
#### 2. Stop the existing etcd process
When each etcd process is stopped, expected errors will be logged by other cluster members. This is normal since a cluster member connection has been (temporarily) broken:
```
2017-01-17 09:34:18.352662 I | raft: raft.node: 1640829d9eea5cfb elected leader 1640829d9eea5cfb at term 5
2017-01-17 09:34:18.359630 W | etcdserver: failed to reach the peerURL(http://localhost:2380) of member fd32987dcd0511e0 (Get http://localhost:2380/version: dial tcp 127.0.0.1:2380: getsockopt: connection refused)
2017-01-17 09:34:18.359679 W | etcdserver: cannot get the version of member fd32987dcd0511e0 (Get http://localhost:2380/version: dial tcp 127.0.0.1:2380: getsockopt: connection refused)
2017-01-17 09:34:18.548116 W | rafthttp: lost the TCP streaming connection with peer fd32987dcd0511e0 (stream Message writer)
2017-01-17 09:34:19.147816 W | rafthttp: lost the TCP streaming connection with peer fd32987dcd0511e0 (stream MsgApp v2 writer)
2017-01-17 09:34:34.364907 W | etcdserver: failed to reach the peerURL(http://localhost:2380) of member fd32987dcd0511e0 (Get http://localhost:2380/version: dial tcp 127.0.0.1:2380: getsockopt: connection refused)
```
It's a good idea at this point to [backup the etcd data](../op-guide/maintenance.md#snapshot-backup) to provide a downgrade path should any problems occur:
```
$ etcdctl snapshot save backup.db
```
#### 3. Drop-in etcd v3.1 binary and start the new etcd process
The new v3.1 etcd will publish its information to the cluster:
```
2017-01-17 09:36:00.996590 I | etcdserver: published {Name:my-etcd-1 ClientURLs:[http://localhost:2379]} to cluster 46bc3ce73049e678
```
Verify that each member, and then the entire cluster, becomes healthy with the new v3.1 etcd binary:
```
$ ETCDCTL_API=3 /etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
localhost:22379 is healthy: successfully committed proposal: took = 5.540129ms
localhost:32379 is healthy: successfully committed proposal: took = 7.321671ms
localhost:2379 is healthy: successfully committed proposal: took = 10.629901ms
```
Upgraded members will log warnings like the following until the entire cluster is upgraded. This is expected and will cease after all etcd cluster members are upgraded to v3.1:
```
2017-01-17 09:36:38.406268 W | etcdserver: the local etcd version 3.0.16 is not up-to-date
2017-01-17 09:36:38.406295 W | etcdserver: member fd32987dcd0511e0 has a higher version 3.1.0
2017-01-17 09:36:42.407695 W | etcdserver: the local etcd version 3.0.16 is not up-to-date
2017-01-17 09:36:42.407730 W | etcdserver: member fd32987dcd0511e0 has a higher version 3.1.0
```
#### 4. Repeat step 2 to step 3 for all other members
#### 5. Finish
When all members are upgraded, the cluster will report upgrading to 3.1 successfully:
```
2017-01-17 09:37:03.100015 I | etcdserver: updating the cluster version from 3.0 to 3.1
2017-01-17 09:37:03.104263 N | etcdserver/membership: updated the cluster version from 3.0 to 3.1
2017-01-17 09:37:03.104374 I | etcdserver/api: enabled capabilities for version 3.1
```
```
$ ETCDCTL_API=3 /etcdctl endpoint health --endpoints=localhost:2379,localhost:22379,localhost:32379
localhost:2379 is healthy: successfully committed proposal: took = 2.312897ms
localhost:22379 is healthy: successfully committed proposal: took = 2.553476ms
localhost:32379 is healthy: successfully committed proposal: took = 2.516902ms
```
[etcd-contact]: https://groups.google.com/forum/#!forum/etcd-dev

2
NEWS
View File

@ -1,4 +1,4 @@
etcd v3.1.0 (2017-01-13)
etcd v3.1.0 (2017-01-20)
- faster linearizable reads (implements Raft read-index)
- automatic leadership transfer when leader steps down
- etcd uses default route IP if advertise URL is not given

View File

@ -37,13 +37,14 @@ See [etcdctl][etcdctl] for a simple command line client.
### Getting etcd
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, [rkt][rkt], and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
For those wanting to try the very latest version, you can [build the latest version of etcd][dl-build] from the `master` branch.
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.7+ is required).
All development occurs on `master`, including new features and bug fixes.
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
[rkt]: https://github.com/coreos/rkt/releases/
[github-release]: https://github.com/coreos/etcd/releases/
[branch-management]: ./Documentation/branch_management.md
[dl-build]: ./Documentation/dl_build.md#build-the-latest-version

View File

@ -6,25 +6,17 @@ This document defines a high level roadmap for etcd development.
The dates below should not be considered authoritative, but rather indicative of the projected timeline of the project. The [milestones defined in GitHub](https://github.com/coreos/etcd/milestones) represent the most up-to-date and issue-for-issue plans.
etcd 3.0 is our current stable branch. The roadmap below outlines new features that will be added to etcd, and while subject to change, define what future stable will look like.
etcd 3.1 is our current stable branch. The roadmap below outlines new features that will be added to etcd, and while subject to change, define what future stable will look like.
### etcd 3.1 (2016-Oct)
- Stable L4 gateway
- Experimental support for scalable proxy
- Automatic leadership transfer for the rolling upgrade
- V3 API improvements
- Get previous key-value pair
- Get only keys (ignore values)
- Get only key count
### etcd 3.2 (2017-Apr)
### etcd 3.2 (2017-May)
- Stable scalable proxy
- Proxy-as-client interface passthrough
- Lock service
- Namespacing proxy
- JWT token based authentication
- TLS Command Name and JWT token based authentication
- Read-modify-write V3 Put
- Improved watch performance
- Support non-blocking concurrent read
### etcd 3.3 (?)
- TBD

View File

@ -21,33 +21,33 @@ import (
"crypto/rand"
"math/big"
"strings"
"sync"
"time"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
defaultSimpleTokenLength = 16
)
// var for testing purposes
var (
simpleTokenTTL = 5 * time.Minute
simpleTokenTTLResolution = 1 * time.Second
)
type simpleTokenTTLKeeper struct {
tokens map[string]time.Time
addSimpleTokenCh chan string
resetSimpleTokenCh chan string
deleteSimpleTokenCh chan string
stopCh chan chan struct{}
deleteTokenFunc func(string)
tokensMu sync.Mutex
tokens map[string]time.Time
stopCh chan chan struct{}
deleteTokenFunc func(string)
}
func NewSimpleTokenTTLKeeper(deletefunc func(string)) *simpleTokenTTLKeeper {
stk := &simpleTokenTTLKeeper{
tokens: make(map[string]time.Time),
addSimpleTokenCh: make(chan string, 1),
resetSimpleTokenCh: make(chan string, 1),
deleteSimpleTokenCh: make(chan string, 1),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
tokens: make(map[string]time.Time),
stopCh: make(chan chan struct{}),
deleteTokenFunc: deletefunc,
}
go stk.run()
return stk
@ -61,37 +61,34 @@ func (tm *simpleTokenTTLKeeper) stop() {
}
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
tm.addSimpleTokenCh <- token
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
tm.resetSimpleTokenCh <- token
if _, ok := tm.tokens[token]; ok {
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
}
}
func (tm *simpleTokenTTLKeeper) deleteSimpleToken(token string) {
tm.deleteSimpleTokenCh <- token
delete(tm.tokens, token)
}
func (tm *simpleTokenTTLKeeper) run() {
tokenTicker := time.NewTicker(simpleTokenTTLResolution)
defer tokenTicker.Stop()
for {
select {
case t := <-tm.addSimpleTokenCh:
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
case t := <-tm.resetSimpleTokenCh:
if _, ok := tm.tokens[t]; ok {
tm.tokens[t] = time.Now().Add(simpleTokenTTL)
}
case t := <-tm.deleteSimpleTokenCh:
delete(tm.tokens, t)
case <-tokenTicker.C:
nowtime := time.Now()
tm.tokensMu.Lock()
for t, tokenendtime := range tm.tokens {
if nowtime.After(tokenendtime) {
tm.deleteTokenFunc(t)
delete(tm.tokens, t)
}
}
tm.tokensMu.Unlock()
case waitCh := <-tm.stopCh:
tm.tokens = make(map[string]time.Time)
waitCh <- struct{}{}
@ -116,6 +113,7 @@ func (as *authStore) GenSimpleToken() (string, error) {
}
func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
_, ok := as.simpleTokens[token]
@ -126,16 +124,21 @@ func (as *authStore) assignSimpleTokenToUser(username, token string) {
as.simpleTokens[token] = username
as.simpleTokenKeeper.addSimpleToken(token)
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}
func (as *authStore) invalidateUser(username string) {
if as.simpleTokenKeeper == nil {
return
}
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
for token, name := range as.simpleTokens {
if strings.Compare(name, username) == 0 {
delete(as.simpleTokens, token)
as.simpleTokenKeeper.deleteSimpleToken(token)
}
}
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
}

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
@ -29,6 +30,7 @@ import (
"github.com/coreos/pkg/capnslog"
"golang.org/x/crypto/bcrypt"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
var (
@ -57,6 +59,7 @@ var (
ErrPermissionNotGranted = errors.New("auth: permission is not granted to the role")
ErrAuthNotEnabled = errors.New("auth: authentication is not enabled")
ErrAuthOldRevision = errors.New("auth: revision in header is old")
ErrInvalidAuthToken = errors.New("auth: invalid auth token")
// BcryptCost is the algorithm cost / strength for hashing auth passwords
BcryptCost = bcrypt.DefaultCost
@ -153,6 +156,9 @@ type AuthStore interface {
// Close does cleanup of AuthStore
Close() error
// AuthInfoFromCtx gets AuthInfo from gRPC's context
AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error)
}
type authStore struct {
@ -162,11 +168,24 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
simpleTokensMu sync.RWMutex
simpleTokens map[string]string // token -> username
simpleTokenKeeper *simpleTokenTTLKeeper
revision uint64
// tokenSimple in v3.2+
indexWaiter func(uint64) <-chan struct{}
simpleTokenKeeper *simpleTokenTTLKeeper
simpleTokensMu sync.Mutex
simpleTokens map[string]string // token -> username
}
func newDeleterFunc(as *authStore) func(string) {
return func(t string) {
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
if username, ok := as.simpleTokens[t]; ok {
plog.Infof("deleting token %s for user %s", t, username)
delete(as.simpleTokens, t)
}
}
}
func (as *authStore) AuthEnable() error {
@ -197,15 +216,7 @@ func (as *authStore) AuthEnable() error {
as.enabled = true
tokenDeleteFunc := func(t string) {
as.simpleTokensMu.Lock()
defer as.simpleTokensMu.Unlock()
if username, ok := as.simpleTokens[t]; ok {
plog.Infof("deleting token %s for user %s", t, username)
delete(as.simpleTokens, t)
}
}
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(tokenDeleteFunc)
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
as.rangePermCache = make(map[string]*unifiedRangePermissions)
@ -635,13 +646,16 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
}
func (as *authStore) AuthInfoFromToken(token string) (*AuthInfo, bool) {
as.simpleTokensMu.RLock()
defer as.simpleTokensMu.RUnlock()
t, ok := as.simpleTokens[token]
// same as '(t *tokenSimple) info' in v3.2+
as.simpleTokenKeeper.tokensMu.Lock()
as.simpleTokensMu.Lock()
username, ok := as.simpleTokens[token]
if ok {
as.simpleTokenKeeper.resetSimpleToken(token)
}
return &AuthInfo{Username: t, Revision: as.revision}, ok
as.simpleTokensMu.Unlock()
as.simpleTokenKeeper.tokensMu.Unlock()
return &AuthInfo{Username: username, Revision: as.revision}, ok
}
type permSlice []*authpb.Permission
@ -753,6 +767,9 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
if !as.isAuthEnabled() {
return nil
}
if authInfo == nil {
return ErrUserEmpty
}
tx := as.be.BatchTx()
tx.Lock()
@ -871,7 +888,7 @@ func (as *authStore) isAuthEnabled() bool {
return as.enabled
}
func NewAuthStore(be backend.Backend) *authStore {
func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{}) *authStore {
tx := be.BatchTx()
tx.Lock()
@ -879,13 +896,30 @@ func NewAuthStore(be backend.Backend) *authStore {
tx.UnsafeCreateBucket(authUsersBucketName)
tx.UnsafeCreateBucket(authRolesBucketName)
as := &authStore{
be: be,
simpleTokens: make(map[string]string),
revision: 0,
enabled := false
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
}
}
as.commitRevision(tx)
as := &authStore{
be: be,
simpleTokens: make(map[string]string),
revision: getRevision(tx),
indexWaiter: indexWaiter,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
}
if enabled {
as.simpleTokenKeeper = NewSimpleTokenTTLKeeper(newDeleterFunc(as))
}
if as.revision == 0 {
as.commitRevision(tx)
}
tx.Unlock()
be.ForceCommit()
@ -912,7 +946,8 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
func getRevision(tx backend.BatchTx) uint64 {
_, vs := tx.UnsafeRange(authBucketName, []byte(revisionKey), nil, 0)
if len(vs) != 1 {
plog.Panicf("failed to get the key of auth store revision")
// this can happen in the initialization phase
return 0
}
return binary.BigEndian.Uint64(vs[0])
@ -921,3 +956,46 @@ func getRevision(tx backend.BatchTx) uint64 {
func (as *authStore) Revision() uint64 {
return as.revision
}
func (as *authStore) isValidSimpleToken(token string, ctx context.Context) bool {
splitted := strings.Split(token, ".")
if len(splitted) != 2 {
return false
}
index, err := strconv.Atoi(splitted[1])
if err != nil {
return false
}
select {
case <-as.indexWaiter(uint64(index)):
return true
case <-ctx.Done():
}
return false
}
func (as *authStore) AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
if !ok {
return nil, nil
}
ts, tok := md["token"]
if !tok {
return nil, nil
}
token := ts[0]
if !as.isValidSimpleToken(token, ctx) {
return nil, ErrInvalidAuthToken
}
authInfo, uok := as.AuthInfoFromToken(token)
if !uok {
plog.Warningf("invalid auth token: %s", token)
return nil, ErrInvalidAuthToken
}
return authInfo, nil
}

View File

@ -26,31 +26,38 @@ import (
func init() { BcryptCost = bcrypt.MinCost }
func TestUserAdd(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer func() {
b.Close()
os.Remove(tPath)
func dummyIndexWaiter(index uint64) <-chan struct{} {
ch := make(chan struct{})
go func() {
ch <- struct{}{}
}()
return ch
}
as := NewAuthStore(b)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add a non-existing user
// TestNewAuthStoreRevision ensures newly auth store
// keeps the old revision when there are no changes.
func TestNewAuthStoreRevision(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer os.Remove(tPath)
as := NewAuthStore(b, dummyIndexWaiter)
err := enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
_, err = as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
if err != ErrUserAlreadyExist {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
old := as.Revision()
b.Close()
as.Close()
ua = &pb.AuthUserAddRequest{Name: ""}
_, err = as.UserAdd(ua) // add a user with empty name
if err != ErrUserEmpty {
t.Fatal(err)
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(b2, dummyIndexWaiter)
new := as.Revision()
b2.Close()
as.Close()
if old != new {
t.Fatalf("expected revision %d, got %d", old, new)
}
}
@ -80,7 +87,7 @@ func TestCheckPassword(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -125,7 +132,7 @@ func TestUserDelete(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -162,7 +169,7 @@ func TestUserChangePassword(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -208,7 +215,7 @@ func TestRoleAdd(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -229,7 +236,7 @@ func TestUserGrant(t *testing.T) {
os.Remove(tPath)
}()
as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
@ -261,4 +268,93 @@ func TestUserGrant(t *testing.T) {
if err != ErrUserNotFound {
t.Fatalf("expected %v, got %v", ErrUserNotFound, err)
}
// non-admin user
err = as.IsAdminPermitted(&AuthInfo{Username: "foo", Revision: 1})
if err != ErrPermissionDenied {
t.Errorf("expected %v, got %v", ErrPermissionDenied, err)
}
// disabled auth should return nil
as.AuthDisable()
err = as.IsAdminPermitted(&AuthInfo{Username: "root", Revision: 1})
if err != nil {
t.Errorf("expected nil, got %v", err)
}
}
func TestRecoverFromSnapshot(t *testing.T) {
as, _ := setupAuthStore(t)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add an existing user
if err == nil {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
if err != ErrUserAlreadyExist {
t.Fatalf("expected %v, got %v", ErrUserAlreadyExist, err)
}
ua = &pb.AuthUserAddRequest{Name: ""}
_, err = as.UserAdd(ua) // add a user with empty name
if err != ErrUserEmpty {
t.Fatal(err)
}
as.Close()
as2 := NewAuthStore(as.be, dummyIndexWaiter)
defer func(a *authStore) {
a.Close()
}(as2)
if !as2.isAuthEnabled() {
t.Fatal("recovering authStore from existing backend failed")
}
ul, err := as.UserList(&pb.AuthUserListRequest{})
if err != nil {
t.Fatal(err)
}
if !contains(ul.Users, "root") {
t.Errorf("expected %v in %v", "root", ul.Users)
}
}
func contains(array []string, str string) bool {
for _, s := range array {
if s == str {
return true
}
}
return false
}
func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testing.T)) {
b, tPath := backend.NewDefaultTmpBackend()
as := NewAuthStore(b, dummyIndexWaiter)
err := enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
}
// adds a new role
_, err = as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test"})
if err != nil {
t.Fatal(err)
}
ua := &pb.AuthUserAddRequest{Name: "foo", Password: "bar"}
_, err = as.UserAdd(ua) // add a non-existing user
if err != nil {
t.Fatal(err)
}
tearDown := func(t *testing.T) {
b.Close()
os.Remove(tPath)
as.Close()
}
return as, tearDown
}

View File

@ -221,7 +221,8 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
return nil, c.ctx.Err()
default:
}
return net.DialTimeout(proto, host, t)
dialer := &net.Dialer{Timeout: t}
return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))

View File

@ -16,6 +16,7 @@ package clientv3
import (
"fmt"
"net"
"testing"
"time"
@ -25,6 +26,47 @@ import (
"google.golang.org/grpc"
)
func TestDialCancel(t *testing.T) {
defer testutil.AfterTest(t)
// accept first connection so client is created with dial timeout
ln, err := net.Listen("unix", "dialcancel:12345")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
ep := "unix://dialcancel:12345"
cfg := Config{
Endpoints: []string{ep},
DialTimeout: 30 * time.Second}
c, err := New(cfg)
if err != nil {
t.Fatal(err)
}
// connect to ipv4 blackhole so dial blocks
c.SetEndpoints("http://254.0.0.1:12345")
// issue Get to force redial attempts
go c.Get(context.TODO(), "abc")
// wait a little bit so client close is after dial starts
time.Sleep(100 * time.Millisecond)
donec := make(chan struct{})
go func() {
defer close(donec)
c.Close()
}()
select {
case <-time.After(5 * time.Second):
t.Fatalf("failed to close")
case <-donec:
}
}
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -249,11 +249,10 @@ func (s *stmReadCommitted) commit() *v3.TxnResponse {
}
func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
rev := r.Header.Revision + 1
if len(r.Kvs) != 0 {
rev = r.Kvs[0].ModRevision + 1
return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
}
return v3.Compare(v3.ModRevision(k), "<", rev)
return v3.Compare(v3.ModRevision(k), "=", 0)
}
func respToValue(resp *v3.GetResponse) string {

View File

@ -156,6 +156,30 @@ func TestLeaseKeepAlive(t *testing.T) {
}
}
func TestLeaseKeepAliveOneSecond(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
resp, err := cli.Grant(context.Background(), 1)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed")
}
}
}
// TODO: add a client that can connect to all the members of cluster via unix sock.
// TODO: test handle more complicated failures.
func TestLeaseKeepAliveHandleFailure(t *testing.T) {

View File

@ -416,7 +416,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
}
// send update to all channels
nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
for _, ch := range ka.chs {
select {

View File

@ -694,6 +694,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return
}
select {

View File

@ -36,12 +36,7 @@
// Contexts.
package context // import "golang.org/x/net/context"
import (
"errors"
"fmt"
"sync"
"time"
)
import "time"
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
@ -66,7 +61,7 @@ type Context interface {
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out <-chan Value) error {
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
@ -138,48 +133,6 @@ type Context interface {
Value(key interface{}) interface{}
}
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
@ -201,247 +154,3 @@ func TODO() Context {
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

72
cmd/vendor/golang.org/x/net/context/go17.go generated vendored Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
package context
import (
"context" // standard library's context, as of Go 1.7
"time"
)
var (
todo = context.TODO()
background = context.Background()
)
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = context.Canceled
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = context.DeadlineExceeded
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
ctx, f := context.WithCancel(parent)
return ctx, CancelFunc(f)
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
ctx, f := context.WithDeadline(parent, deadline)
return ctx, CancelFunc(f)
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return context.WithValue(parent, key, val)
}

300
cmd/vendor/golang.org/x/net/context/pre_go17.go generated vendored Normal file
View File

@ -0,0 +1,300 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package context
import (
"errors"
"fmt"
"sync"
"time"
)
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, c)
return c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) *cancelCtx {
return &cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
*cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

View File

@ -18,6 +18,18 @@ type ClientConnPool interface {
MarkDead(*ClientConn)
}
// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
// implementations which can close their idle connections.
type clientConnPoolIdleCloser interface {
ClientConnPool
closeIdleConnections()
}
var (
_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
_ clientConnPoolIdleCloser = noDialClientConnPool{}
)
// TODO: use singleflight for dialing and addConnCalls?
type clientConnPool struct {
t *Transport
@ -40,7 +52,16 @@ const (
noDialOnMiss = false
)
func (p *clientConnPool) getClientConn(_ *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
if isConnectionCloseRequest(req) && dialOnMiss {
// It gets its own connection.
const singleUse = true
cc, err := p.t.dialClientConn(addr, singleUse)
if err != nil {
return nil, err
}
return cc, nil
}
p.mu.Lock()
for _, cc := range p.conns[addr] {
if cc.CanTakeNewRequest() {
@ -83,7 +104,8 @@ func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
// run in its own goroutine.
func (c *dialCall) dial(addr string) {
c.res, c.err = c.p.t.dialClientConn(addr)
const singleUse = false // shared conn
c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
close(c.done)
c.p.mu.Lock()
@ -223,3 +245,12 @@ func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
}
return out
}
// noDialClientConnPool is an implementation of http2.ClientConnPool
// which never dials. We let the HTTP/1.1 client dial and use its TLS
// connection instead.
type noDialClientConnPool struct{ *clientConnPool }
func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
return p.getClientConn(req, addr, noDialOnMiss)
}

View File

@ -32,7 +32,7 @@ func configureTransport(t1 *http.Transport) (*Transport, error) {
t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
}
upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper {
addr := authorityAddr(authority)
addr := authorityAddr("https", authority)
if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
go c.Close()
return erringRoundTripper{err}
@ -67,15 +67,6 @@ func registerHTTPSProtocol(t *http.Transport, rt http.RoundTripper) (err error)
return nil
}
// noDialClientConnPool is an implementation of http2.ClientConnPool
// which never dials. We let the HTTP/1.1 client dial and use its TLS
// connection instead.
type noDialClientConnPool struct{ *clientConnPool }
func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
return p.getClientConn(req, addr, noDialOnMiss)
}
// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
// if there's already has a cached connection to the host.
type noDialH2RoundTripper struct{ t *Transport }

View File

@ -64,9 +64,17 @@ func (e ConnectionError) Error() string { return fmt.Sprintf("connection error:
type StreamError struct {
StreamID uint32
Code ErrCode
Cause error // optional additional detail
}
func streamError(id uint32, code ErrCode) StreamError {
return StreamError{StreamID: id, Code: code}
}
func (e StreamError) Error() string {
if e.Cause != nil {
return fmt.Sprintf("stream error: stream ID %d; %v; %v", e.StreamID, e.Code, e.Cause)
}
return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code)
}

View File

@ -15,6 +15,7 @@ import (
"sync"
"golang.org/x/net/http2/hpack"
"golang.org/x/net/lex/httplex"
)
const frameHeaderLen = 9
@ -316,10 +317,12 @@ type Framer struct {
// non-Continuation or Continuation on a different stream is
// attempted to be written.
logReads bool
logReads, logWrites bool
debugFramer *Framer // only use for logging written writes
debugFramerBuf *bytes.Buffer
debugFramer *Framer // only use for logging written writes
debugFramerBuf *bytes.Buffer
debugReadLoggerf func(string, ...interface{})
debugWriteLoggerf func(string, ...interface{})
}
func (fr *Framer) maxHeaderListSize() uint32 {
@ -354,7 +357,7 @@ func (f *Framer) endWrite() error {
byte(length>>16),
byte(length>>8),
byte(length))
if logFrameWrites {
if f.logWrites {
f.logWrite()
}
@ -377,10 +380,10 @@ func (f *Framer) logWrite() {
f.debugFramerBuf.Write(f.wbuf)
fr, err := f.debugFramer.ReadFrame()
if err != nil {
log.Printf("http2: Framer %p: failed to decode just-written frame", f)
f.debugWriteLoggerf("http2: Framer %p: failed to decode just-written frame", f)
return
}
log.Printf("http2: Framer %p: wrote %v", f, summarizeFrame(fr))
f.debugWriteLoggerf("http2: Framer %p: wrote %v", f, summarizeFrame(fr))
}
func (f *Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) }
@ -398,9 +401,12 @@ const (
// NewFramer returns a Framer that writes frames to w and reads them from r.
func NewFramer(w io.Writer, r io.Reader) *Framer {
fr := &Framer{
w: w,
r: r,
logReads: logFrameReads,
w: w,
r: r,
logReads: logFrameReads,
logWrites: logFrameWrites,
debugReadLoggerf: log.Printf,
debugWriteLoggerf: log.Printf,
}
fr.getReadBuf = func(size uint32) []byte {
if cap(fr.readBuf) >= int(size) {
@ -453,7 +459,7 @@ func terminalReadFrameError(err error) bool {
//
// If the frame is larger than previously set with SetMaxReadFrameSize, the
// returned error is ErrFrameTooLarge. Other errors may be of type
// ConnectionError, StreamError, or anything else from from the underlying
// ConnectionError, StreamError, or anything else from the underlying
// reader.
func (fr *Framer) ReadFrame() (Frame, error) {
fr.errDetail = nil
@ -482,7 +488,7 @@ func (fr *Framer) ReadFrame() (Frame, error) {
return nil, err
}
if fr.logReads {
log.Printf("http2: Framer %p: read %v", fr, summarizeFrame(f))
fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
}
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame))
@ -590,7 +596,15 @@ func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
return f, nil
}
var errStreamID = errors.New("invalid streamid")
var (
errStreamID = errors.New("invalid stream ID")
errDepStreamID = errors.New("invalid dependent stream ID")
errPadLength = errors.New("pad length too large")
)
func validStreamIDOrZero(streamID uint32) bool {
return streamID&(1<<31) == 0
}
func validStreamID(streamID uint32) bool {
return streamID != 0 && streamID&(1<<31) == 0
@ -599,18 +613,40 @@ func validStreamID(streamID uint32) bool {
// WriteData writes a DATA frame.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently.
// It is the caller's responsibility not to violate the maximum frame size
// and to not call other Write methods concurrently.
func (f *Framer) WriteData(streamID uint32, endStream bool, data []byte) error {
// TODO: ignoring padding for now. will add when somebody cares.
return f.WriteDataPadded(streamID, endStream, data, nil)
}
// WriteData writes a DATA frame with optional padding.
//
// If pad is nil, the padding bit is not sent.
// The length of pad must not exceed 255 bytes.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility not to violate the maximum frame size
// and to not call other Write methods concurrently.
func (f *Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad []byte) error {
if !validStreamID(streamID) && !f.AllowIllegalWrites {
return errStreamID
}
if len(pad) > 255 {
return errPadLength
}
var flags Flags
if endStream {
flags |= FlagDataEndStream
}
if pad != nil {
flags |= FlagDataPadded
}
f.startWrite(FrameData, flags, streamID)
if pad != nil {
f.wbuf = append(f.wbuf, byte(len(pad)))
}
f.wbuf = append(f.wbuf, data...)
f.wbuf = append(f.wbuf, pad...)
return f.endWrite()
}
@ -706,7 +742,7 @@ func (f *Framer) WriteSettings(settings ...Setting) error {
return f.endWrite()
}
// WriteSettings writes an empty SETTINGS frame with the ACK bit set.
// WriteSettingsAck writes an empty SETTINGS frame with the ACK bit set.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently.
@ -832,7 +868,7 @@ func parseWindowUpdateFrame(fh FrameHeader, p []byte) (Frame, error) {
if fh.StreamID == 0 {
return nil, ConnectionError(ErrCodeProtocol)
}
return nil, StreamError{fh.StreamID, ErrCodeProtocol}
return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
return &WindowUpdateFrame{
FrameHeader: fh,
@ -913,7 +949,7 @@ func parseHeadersFrame(fh FrameHeader, p []byte) (_ Frame, err error) {
}
}
if len(p)-int(padLength) <= 0 {
return nil, StreamError{fh.StreamID, ErrCodeProtocol}
return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
hf.headerFragBuf = p[:len(p)-int(padLength)]
return hf, nil
@ -977,8 +1013,8 @@ func (f *Framer) WriteHeaders(p HeadersFrameParam) error {
}
if !p.Priority.IsZero() {
v := p.Priority.StreamDep
if !validStreamID(v) && !f.AllowIllegalWrites {
return errors.New("invalid dependent stream id")
if !validStreamIDOrZero(v) && !f.AllowIllegalWrites {
return errDepStreamID
}
if p.Priority.Exclusive {
v |= 1 << 31
@ -1046,6 +1082,9 @@ func (f *Framer) WritePriority(streamID uint32, p PriorityParam) error {
if !validStreamID(streamID) && !f.AllowIllegalWrites {
return errStreamID
}
if !validStreamIDOrZero(p.StreamDep) {
return errDepStreamID
}
f.startWrite(FramePriority, 0, streamID)
v := p.StreamDep
if p.Exclusive {
@ -1385,7 +1424,10 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(fr.maxHeaderStringLen())
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
if !validHeaderFieldValue(hf.Value) {
if VerboseLogs && fr.logReads {
fr.debugReadLoggerf("http2: decoded hpack field %+v", hf)
}
if !httplex.ValidHeaderFieldValue(hf.Value) {
invalid = headerFieldValueError(hf.Value)
}
isPseudo := strings.HasPrefix(hf.Name, ":")
@ -1395,7 +1437,7 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
}
} else {
sawRegular = true
if !validHeaderFieldName(hf.Name) {
if !validWireHeaderFieldName(hf.Name) {
invalid = headerFieldNameError(hf.Name)
}
}
@ -1443,11 +1485,17 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
}
if invalid != nil {
fr.errDetail = invalid
return nil, StreamError{mh.StreamID, ErrCodeProtocol}
if VerboseLogs {
log.Printf("http2: invalid header: %v", invalid)
}
return nil, StreamError{mh.StreamID, ErrCodeProtocol, invalid}
}
if err := mh.checkPseudos(); err != nil {
fr.errDetail = err
return nil, StreamError{mh.StreamID, ErrCodeProtocol}
if VerboseLogs {
log.Printf("http2: invalid pseudo headers: %v", err)
}
return nil, StreamError{mh.StreamID, ErrCodeProtocol, err}
}
return mh, nil
}

View File

@ -1,11 +0,0 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.5
package http2
import "net/http"
func requestCancel(req *http.Request) <-chan struct{} { return req.Cancel }

43
cmd/vendor/golang.org/x/net/http2/go16.go generated vendored Normal file
View File

@ -0,0 +1,43 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.6
package http2
import (
"crypto/tls"
"net/http"
"time"
)
func transportExpectContinueTimeout(t1 *http.Transport) time.Duration {
return t1.ExpectContinueTimeout
}
// isBadCipher reports whether the cipher is blacklisted by the HTTP/2 spec.
func isBadCipher(cipher uint16) bool {
switch cipher {
case tls.TLS_RSA_WITH_RC4_128_SHA,
tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
tls.TLS_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA,
tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:
// Reject cipher suites from Appendix A.
// "This list includes those cipher suites that do not
// offer an ephemeral key exchange and those that are
// based on the TLS null, stream or block cipher type"
return true
default:
return false
}
}

106
cmd/vendor/golang.org/x/net/http2/go17.go generated vendored Normal file
View File

@ -0,0 +1,106 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
package http2
import (
"context"
"net"
"net/http"
"net/http/httptrace"
"time"
)
type contextContext interface {
context.Context
}
func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx contextContext, cancel func()) {
ctx, cancel = context.WithCancel(context.Background())
ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
if hs := opts.baseConfig(); hs != nil {
ctx = context.WithValue(ctx, http.ServerContextKey, hs)
}
return
}
func contextWithCancel(ctx contextContext) (_ contextContext, cancel func()) {
return context.WithCancel(ctx)
}
func requestWithContext(req *http.Request, ctx contextContext) *http.Request {
return req.WithContext(ctx)
}
type clientTrace httptrace.ClientTrace
func reqContext(r *http.Request) context.Context { return r.Context() }
func (t *Transport) idleConnTimeout() time.Duration {
if t.t1 != nil {
return t.t1.IdleConnTimeout
}
return 0
}
func setResponseUncompressed(res *http.Response) { res.Uncompressed = true }
func traceGotConn(req *http.Request, cc *ClientConn) {
trace := httptrace.ContextClientTrace(req.Context())
if trace == nil || trace.GotConn == nil {
return
}
ci := httptrace.GotConnInfo{Conn: cc.tconn}
cc.mu.Lock()
ci.Reused = cc.nextStreamID > 1
ci.WasIdle = len(cc.streams) == 0 && ci.Reused
if ci.WasIdle && !cc.lastActive.IsZero() {
ci.IdleTime = time.Now().Sub(cc.lastActive)
}
cc.mu.Unlock()
trace.GotConn(ci)
}
func traceWroteHeaders(trace *clientTrace) {
if trace != nil && trace.WroteHeaders != nil {
trace.WroteHeaders()
}
}
func traceGot100Continue(trace *clientTrace) {
if trace != nil && trace.Got100Continue != nil {
trace.Got100Continue()
}
}
func traceWait100Continue(trace *clientTrace) {
if trace != nil && trace.Wait100Continue != nil {
trace.Wait100Continue()
}
}
func traceWroteRequest(trace *clientTrace, err error) {
if trace != nil && trace.WroteRequest != nil {
trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
}
}
func traceFirstResponseByte(trace *clientTrace) {
if trace != nil && trace.GotFirstResponseByte != nil {
trace.GotFirstResponseByte()
}
}
func requestTrace(req *http.Request) *clientTrace {
trace := httptrace.ContextClientTrace(req.Context())
return (*clientTrace)(trace)
}
// Ping sends a PING frame to the server and waits for the ack.
func (cc *ClientConn) Ping(ctx context.Context) error {
return cc.ping(ctx)
}

36
cmd/vendor/golang.org/x/net/http2/go17_not18.go generated vendored Normal file
View File

@ -0,0 +1,36 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7,!go1.8
package http2
import "crypto/tls"
// temporary copy of Go 1.7's private tls.Config.clone:
func cloneTLSConfig(c *tls.Config) *tls.Config {
return &tls.Config{
Rand: c.Rand,
Time: c.Time,
Certificates: c.Certificates,
NameToCertificate: c.NameToCertificate,
GetCertificate: c.GetCertificate,
RootCAs: c.RootCAs,
NextProtos: c.NextProtos,
ServerName: c.ServerName,
ClientAuth: c.ClientAuth,
ClientCAs: c.ClientCAs,
InsecureSkipVerify: c.InsecureSkipVerify,
CipherSuites: c.CipherSuites,
PreferServerCipherSuites: c.PreferServerCipherSuites,
SessionTicketsDisabled: c.SessionTicketsDisabled,
SessionTicketKey: c.SessionTicketKey,
ClientSessionCache: c.ClientSessionCache,
MinVersion: c.MinVersion,
MaxVersion: c.MaxVersion,
CurvePreferences: c.CurvePreferences,
DynamicRecordSizingDisabled: c.DynamicRecordSizingDisabled,
Renegotiation: c.Renegotiation,
}
}

50
cmd/vendor/golang.org/x/net/http2/go18.go generated vendored Normal file
View File

@ -0,0 +1,50 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.8
package http2
import (
"crypto/tls"
"io"
"net/http"
)
func cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() }
var _ http.Pusher = (*responseWriter)(nil)
// Push implements http.Pusher.
func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
internalOpts := pushOptions{}
if opts != nil {
internalOpts.Method = opts.Method
internalOpts.Header = opts.Header
}
return w.push(target, internalOpts)
}
func configureServer18(h1 *http.Server, h2 *Server) error {
if h2.IdleTimeout == 0 {
if h1.IdleTimeout != 0 {
h2.IdleTimeout = h1.IdleTimeout
} else {
h2.IdleTimeout = h1.ReadTimeout
}
}
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil && panicValue != http.ErrAbortHandler
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return req.GetBody
}
func reqBodyIsNoBody(body io.ReadCloser) bool {
return body == http.NoBody
}

View File

@ -43,7 +43,7 @@ type HeaderField struct {
// IsPseudo reports whether the header field is an http2 pseudo header.
// That is, it reports whether it starts with a colon.
// It is not otherwise guaranteed to be a valid psuedo header field,
// It is not otherwise guaranteed to be a valid pseudo header field,
// though.
func (hf HeaderField) IsPseudo() bool {
return len(hf.Name) != 0 && hf.Name[0] == ':'
@ -57,7 +57,7 @@ func (hf HeaderField) String() string {
return fmt.Sprintf("header field %q = %q%s", hf.Name, hf.Value, suffix)
}
// Size returns the size of an entry per RFC 7540 section 5.2.
// Size returns the size of an entry per RFC 7541 section 4.1.
func (hf HeaderField) Size() uint32 {
// http://http2.github.io/http2-spec/compression.html#rfc.section.4.1
// "The size of the dynamic table is the sum of the size of

View File

@ -48,12 +48,16 @@ var ErrInvalidHuffman = errors.New("hpack: invalid Huffman-encoded data")
// maxLen bytes will return ErrStringLength.
func huffmanDecode(buf *bytes.Buffer, maxLen int, v []byte) error {
n := rootHuffmanNode
cur, nbits := uint(0), uint8(0)
// cur is the bit buffer that has not been fed into n.
// cbits is the number of low order bits in cur that are valid.
// sbits is the number of bits of the symbol prefix being decoded.
cur, cbits, sbits := uint(0), uint8(0), uint8(0)
for _, b := range v {
cur = cur<<8 | uint(b)
nbits += 8
for nbits >= 8 {
idx := byte(cur >> (nbits - 8))
cbits += 8
sbits += 8
for cbits >= 8 {
idx := byte(cur >> (cbits - 8))
n = n.children[idx]
if n == nil {
return ErrInvalidHuffman
@ -63,22 +67,40 @@ func huffmanDecode(buf *bytes.Buffer, maxLen int, v []byte) error {
return ErrStringLength
}
buf.WriteByte(n.sym)
nbits -= n.codeLen
cbits -= n.codeLen
n = rootHuffmanNode
sbits = cbits
} else {
nbits -= 8
cbits -= 8
}
}
}
for nbits > 0 {
n = n.children[byte(cur<<(8-nbits))]
if n.children != nil || n.codeLen > nbits {
for cbits > 0 {
n = n.children[byte(cur<<(8-cbits))]
if n == nil {
return ErrInvalidHuffman
}
if n.children != nil || n.codeLen > cbits {
break
}
if maxLen != 0 && buf.Len() == maxLen {
return ErrStringLength
}
buf.WriteByte(n.sym)
nbits -= n.codeLen
cbits -= n.codeLen
n = rootHuffmanNode
sbits = cbits
}
if sbits > 7 {
// Either there was an incomplete symbol, or overlong padding.
// Both are decoding errors per RFC 7541 section 5.2.
return ErrInvalidHuffman
}
if mask := uint(1<<cbits - 1); cur&mask != mask {
// Trailing bits must be a prefix of EOS per RFC 7541 section 5.2.
return ErrInvalidHuffman
}
return nil
}

View File

@ -13,7 +13,8 @@
// See https://http2.github.io/ for more information on HTTP/2.
//
// See https://http2.golang.org/ for a test server running this code.
package http2
//
package http2 // import "golang.org/x/net/http2"
import (
"bufio"
@ -23,15 +24,19 @@ import (
"io"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"golang.org/x/net/lex/httplex"
)
var (
VerboseLogs bool
logFrameWrites bool
logFrameReads bool
inTests bool
)
func init() {
@ -73,13 +78,23 @@ var (
type streamState int
// HTTP/2 stream states.
//
// See http://tools.ietf.org/html/rfc7540#section-5.1.
//
// For simplicity, the server code merges "reserved (local)" into
// "half-closed (remote)". This is one less state transition to track.
// The only downside is that we send PUSH_PROMISEs slightly less
// liberally than allowable. More discussion here:
// https://lists.w3.org/Archives/Public/ietf-http-wg/2016JulSep/0599.html
//
// "reserved (remote)" is omitted since the client code does not
// support server push.
const (
stateIdle streamState = iota
stateOpen
stateHalfClosedLocal
stateHalfClosedRemote
stateResvLocal
stateResvRemote
stateClosed
)
@ -88,8 +103,6 @@ var stateName = [...]string{
stateOpen: "Open",
stateHalfClosedLocal: "HalfClosedLocal",
stateHalfClosedRemote: "HalfClosedRemote",
stateResvLocal: "ResvLocal",
stateResvRemote: "ResvRemote",
stateClosed: "Closed",
}
@ -165,57 +178,23 @@ var (
errInvalidHeaderFieldValue = errors.New("http2: invalid header field value")
)
// validHeaderFieldName reports whether v is a valid header field name (key).
// RFC 7230 says:
// header-field = field-name ":" OWS field-value OWS
// field-name = token
// tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
// "^" / "_" / "
// validWireHeaderFieldName reports whether v is a valid header field
// name (key). See httplex.ValidHeaderName for the base rules.
//
// Further, http2 says:
// "Just as in HTTP/1.x, header field names are strings of ASCII
// characters that are compared in a case-insensitive
// fashion. However, header field names MUST be converted to
// lowercase prior to their encoding in HTTP/2. "
func validHeaderFieldName(v string) bool {
func validWireHeaderFieldName(v string) bool {
if len(v) == 0 {
return false
}
for _, r := range v {
if int(r) >= len(isTokenTable) || ('A' <= r && r <= 'Z') {
if !httplex.IsTokenRune(r) {
return false
}
if !isTokenTable[byte(r)] {
return false
}
}
return true
}
// validHeaderFieldValue reports whether v is a valid header field value.
//
// RFC 7230 says:
// field-value = *( field-content / obs-fold )
// obj-fold = N/A to http2, and deprecated
// field-content = field-vchar [ 1*( SP / HTAB ) field-vchar ]
// field-vchar = VCHAR / obs-text
// obs-text = %x80-FF
// VCHAR = "any visible [USASCII] character"
//
// http2 further says: "Similarly, HTTP/2 allows header field values
// that are not valid. While most of the values that can be encoded
// will not alter header field parsing, carriage return (CR, ASCII
// 0xd), line feed (LF, ASCII 0xa), and the zero character (NUL, ASCII
// 0x0) might be exploited by an attacker if they are translated
// verbatim. Any request or response that contains a character not
// permitted in a header field value MUST be treated as malformed
// (Section 8.1.2.6). Valid characters are defined by the
// field-content ABNF rule in Section 3.2 of [RFC7230]."
//
// This function does not (yet?) properly handle the rejection of
// strings that begin or end with SP or HTAB.
func validHeaderFieldValue(v string) bool {
for i := 0; i < len(v); i++ {
if b := v[i]; b < ' ' && b != '\t' || b == 0x7f {
if 'A' <= r && r <= 'Z' {
return false
}
}
@ -283,14 +262,27 @@ func newBufferedWriter(w io.Writer) *bufferedWriter {
return &bufferedWriter{w: w}
}
// bufWriterPoolBufferSize is the size of bufio.Writer's
// buffers created using bufWriterPool.
//
// TODO: pick a less arbitrary value? this is a bit under
// (3 x typical 1500 byte MTU) at least. Other than that,
// not much thought went into it.
const bufWriterPoolBufferSize = 4 << 10
var bufWriterPool = sync.Pool{
New: func() interface{} {
// TODO: pick something better? this is a bit under
// (3 x typical 1500 byte MTU) at least.
return bufio.NewWriterSize(nil, 4<<10)
return bufio.NewWriterSize(nil, bufWriterPoolBufferSize)
},
}
func (w *bufferedWriter) Available() int {
if w.bw == nil {
return bufWriterPoolBufferSize
}
return w.bw.Available()
}
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
if w.bw == nil {
bw := bufWriterPool.Get().(*bufio.Writer)
@ -320,7 +312,7 @@ func mustUint31(v int32) uint32 {
}
// bodyAllowedForStatus reports whether a given response status code
// permits a body. See RFC2616, section 4.4.
// permits a body. See RFC 2616, section 4.4.
func bodyAllowedForStatus(status int) bool {
switch {
case status >= 100 && status <= 199:
@ -344,86 +336,52 @@ func (e *httpError) Temporary() bool { return true }
var errTimeout error = &httpError{msg: "http2: timeout awaiting response headers", timeout: true}
var isTokenTable = [127]bool{
'!': true,
'#': true,
'$': true,
'%': true,
'&': true,
'\'': true,
'*': true,
'+': true,
'-': true,
'.': true,
'0': true,
'1': true,
'2': true,
'3': true,
'4': true,
'5': true,
'6': true,
'7': true,
'8': true,
'9': true,
'A': true,
'B': true,
'C': true,
'D': true,
'E': true,
'F': true,
'G': true,
'H': true,
'I': true,
'J': true,
'K': true,
'L': true,
'M': true,
'N': true,
'O': true,
'P': true,
'Q': true,
'R': true,
'S': true,
'T': true,
'U': true,
'W': true,
'V': true,
'X': true,
'Y': true,
'Z': true,
'^': true,
'_': true,
'`': true,
'a': true,
'b': true,
'c': true,
'd': true,
'e': true,
'f': true,
'g': true,
'h': true,
'i': true,
'j': true,
'k': true,
'l': true,
'm': true,
'n': true,
'o': true,
'p': true,
'q': true,
'r': true,
's': true,
't': true,
'u': true,
'v': true,
'w': true,
'x': true,
'y': true,
'z': true,
'|': true,
'~': true,
}
type connectionStater interface {
ConnectionState() tls.ConnectionState
}
var sorterPool = sync.Pool{New: func() interface{} { return new(sorter) }}
type sorter struct {
v []string // owned by sorter
}
func (s *sorter) Len() int { return len(s.v) }
func (s *sorter) Swap(i, j int) { s.v[i], s.v[j] = s.v[j], s.v[i] }
func (s *sorter) Less(i, j int) bool { return s.v[i] < s.v[j] }
// Keys returns the sorted keys of h.
//
// The returned slice is only valid until s used again or returned to
// its pool.
func (s *sorter) Keys(h http.Header) []string {
keys := s.v[:0]
for k := range h {
keys = append(keys, k)
}
s.v = keys
sort.Sort(s)
return keys
}
func (s *sorter) SortStrings(ss []string) {
// Our sorter works on s.v, which sorter owns, so
// stash it away while we sort the user's buffer.
save := s.v
s.v = ss
sort.Sort(s)
s.v = save
}
// validPseudoPath reports whether v is a valid :path pseudo-header
// value. It must be either:
//
// *) a non-empty string starting with '/', but not with with "//",
// *) the string '*', for OPTIONS requests.
//
// For now this is only used a quick check for deciding when to clean
// up Opaque URLs before sending requests from the Transport.
// See golang.org/issue/16847
func validPseudoPath(v string) bool {
return (len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')) || v == "*"
}

View File

@ -1,11 +0,0 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.5
package http2
import "net/http"
func requestCancel(req *http.Request) <-chan struct{} { return nil }

View File

@ -6,8 +6,41 @@
package http2
import "net/http"
import (
"crypto/tls"
"net/http"
"time"
)
func configureTransport(t1 *http.Transport) (*Transport, error) {
return nil, errTransportVersion
}
func transportExpectContinueTimeout(t1 *http.Transport) time.Duration {
return 0
}
// isBadCipher reports whether the cipher is blacklisted by the HTTP/2 spec.
func isBadCipher(cipher uint16) bool {
switch cipher {
case tls.TLS_RSA_WITH_RC4_128_SHA,
tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
tls.TLS_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA,
tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:
// Reject cipher suites from Appendix A.
// "This list includes those cipher suites that do not
// offer an ephemeral key exchange and those that are
// based on the TLS null, stream or block cipher type"
return true
default:
return false
}
}

87
cmd/vendor/golang.org/x/net/http2/not_go17.go generated vendored Normal file
View File

@ -0,0 +1,87 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package http2
import (
"crypto/tls"
"net"
"net/http"
"time"
)
type contextContext interface {
Done() <-chan struct{}
Err() error
}
type fakeContext struct{}
func (fakeContext) Done() <-chan struct{} { return nil }
func (fakeContext) Err() error { panic("should not be called") }
func reqContext(r *http.Request) fakeContext {
return fakeContext{}
}
func setResponseUncompressed(res *http.Response) {
// Nothing.
}
type clientTrace struct{}
func requestTrace(*http.Request) *clientTrace { return nil }
func traceGotConn(*http.Request, *ClientConn) {}
func traceFirstResponseByte(*clientTrace) {}
func traceWroteHeaders(*clientTrace) {}
func traceWroteRequest(*clientTrace, error) {}
func traceGot100Continue(trace *clientTrace) {}
func traceWait100Continue(trace *clientTrace) {}
func nop() {}
func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx contextContext, cancel func()) {
return nil, nop
}
func contextWithCancel(ctx contextContext) (_ contextContext, cancel func()) {
return ctx, nop
}
func requestWithContext(req *http.Request, ctx contextContext) *http.Request {
return req
}
// temporary copy of Go 1.6's private tls.Config.clone:
func cloneTLSConfig(c *tls.Config) *tls.Config {
return &tls.Config{
Rand: c.Rand,
Time: c.Time,
Certificates: c.Certificates,
NameToCertificate: c.NameToCertificate,
GetCertificate: c.GetCertificate,
RootCAs: c.RootCAs,
NextProtos: c.NextProtos,
ServerName: c.ServerName,
ClientAuth: c.ClientAuth,
ClientCAs: c.ClientCAs,
InsecureSkipVerify: c.InsecureSkipVerify,
CipherSuites: c.CipherSuites,
PreferServerCipherSuites: c.PreferServerCipherSuites,
SessionTicketsDisabled: c.SessionTicketsDisabled,
SessionTicketKey: c.SessionTicketKey,
ClientSessionCache: c.ClientSessionCache,
MinVersion: c.MinVersion,
MaxVersion: c.MaxVersion,
CurvePreferences: c.CurvePreferences,
}
}
func (cc *ClientConn) Ping(ctx contextContext) error {
return cc.ping(ctx)
}
func (t *Transport) idleConnTimeout() time.Duration { return 0 }

27
cmd/vendor/golang.org/x/net/http2/not_go18.go generated vendored Normal file
View File

@ -0,0 +1,27 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.8
package http2
import (
"io"
"net/http"
)
func configureServer18(h1 *http.Server, h2 *Server) error {
// No IdleTimeout to sync prior to Go 1.8.
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return nil
}
func reqBodyIsNoBody(io.ReadCloser) bool { return false }

View File

@ -29,6 +29,12 @@ type pipeBuffer interface {
io.Reader
}
func (p *pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.b.Len()
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -9,15 +9,21 @@ import (
"fmt"
"log"
"net/http"
"sort"
"net/url"
"time"
"golang.org/x/net/http2/hpack"
"golang.org/x/net/lex/httplex"
)
// writeFramer is implemented by any type that is used to write frames.
type writeFramer interface {
writeFrame(writeContext) error
// staysWithinBuffer reports whether this writer promises that
// it will only write less than or equal to size bytes, and it
// won't Flush the write context.
staysWithinBuffer(size int) bool
}
// writeContext is the interface needed by the various frame writer
@ -39,9 +45,10 @@ type writeContext interface {
HeaderEncoder() (*hpack.Encoder, *bytes.Buffer)
}
// endsStream reports whether the given frame writer w will locally
// close the stream.
func endsStream(w writeFramer) bool {
// writeEndsStream reports whether w writes a frame that will transition
// the stream to a half-closed local state. This returns false for RST_STREAM,
// which closes the entire stream (not just the local half).
func writeEndsStream(w writeFramer) bool {
switch v := w.(type) {
case *writeData:
return v.endStream
@ -51,7 +58,7 @@ func endsStream(w writeFramer) bool {
// This can only happen if the caller reuses w after it's
// been intentionally nil'ed out to prevent use. Keep this
// here to catch future refactoring breaking it.
panic("endsStream called on nil writeFramer")
panic("writeEndsStream called on nil writeFramer")
}
return false
}
@ -62,8 +69,16 @@ func (flushFrameWriter) writeFrame(ctx writeContext) error {
return ctx.Flush()
}
func (flushFrameWriter) staysWithinBuffer(max int) bool { return false }
type writeSettings []Setting
func (s writeSettings) staysWithinBuffer(max int) bool {
const settingSize = 6 // uint16 + uint32
return frameHeaderLen+settingSize*len(s) <= max
}
func (s writeSettings) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteSettings([]Setting(s)...)
}
@ -83,6 +98,8 @@ func (p *writeGoAway) writeFrame(ctx writeContext) error {
return err
}
func (*writeGoAway) staysWithinBuffer(max int) bool { return false } // flushes
type writeData struct {
streamID uint32
p []byte
@ -97,6 +114,10 @@ func (w *writeData) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteData(w.streamID, w.endStream, w.p)
}
func (w *writeData) staysWithinBuffer(max int) bool {
return frameHeaderLen+len(w.p) <= max
}
// handlerPanicRST is the message sent from handler goroutines when
// the handler panics.
type handlerPanicRST struct {
@ -107,22 +128,57 @@ func (hp handlerPanicRST) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteRSTStream(hp.StreamID, ErrCodeInternal)
}
func (hp handlerPanicRST) staysWithinBuffer(max int) bool { return frameHeaderLen+4 <= max }
func (se StreamError) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteRSTStream(se.StreamID, se.Code)
}
func (se StreamError) staysWithinBuffer(max int) bool { return frameHeaderLen+4 <= max }
type writePingAck struct{ pf *PingFrame }
func (w writePingAck) writeFrame(ctx writeContext) error {
return ctx.Framer().WritePing(true, w.pf.Data)
}
func (w writePingAck) staysWithinBuffer(max int) bool { return frameHeaderLen+len(w.pf.Data) <= max }
type writeSettingsAck struct{}
func (writeSettingsAck) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteSettingsAck()
}
func (writeSettingsAck) staysWithinBuffer(max int) bool { return frameHeaderLen <= max }
// splitHeaderBlock splits headerBlock into fragments so that each fragment fits
// in a single frame, then calls fn for each fragment. firstFrag/lastFrag are true
// for the first/last fragment, respectively.
func splitHeaderBlock(ctx writeContext, headerBlock []byte, fn func(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error) error {
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE
// that all peers must support (16KB). Later we could care
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
first := true
for len(headerBlock) > 0 {
frag := headerBlock
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
if err := fn(ctx, frag, first, len(headerBlock) == 0); err != nil {
return err
}
first = false
}
return nil
}
// writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames
// for HTTP response headers or trailers from a server handler.
type writeResHeaders struct {
@ -144,6 +200,17 @@ func encKV(enc *hpack.Encoder, k, v string) {
enc.WriteField(hpack.HeaderField{Name: k, Value: v})
}
func (w *writeResHeaders) staysWithinBuffer(max int) bool {
// TODO: this is a common one. It'd be nice to return true
// here and get into the fast path if we could be clever and
// calculate the size fast enough, or at least a conservative
// uppper bound that usually fires. (Maybe if w.h and
// w.trailers are nil, so we don't need to enumerate it.)
// Otherwise I'm afraid that just calculating the length to
// answer this question would be slower than the ~2µs benefit.
return false
}
func (w *writeResHeaders) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
@ -169,39 +236,69 @@ func (w *writeResHeaders) writeFrame(ctx writeContext) error {
panic("unexpected empty hpack")
}
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE
// that all peers must support (16KB). Later we could care
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
}
first := true
for len(headerBlock) > 0 {
frag := headerBlock
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
endHeaders := len(headerBlock) == 0
var err error
if first {
first = false
err = ctx.Framer().WriteHeaders(HeadersFrameParam{
StreamID: w.streamID,
BlockFragment: frag,
EndStream: w.endStream,
EndHeaders: endHeaders,
})
} else {
err = ctx.Framer().WriteContinuation(w.streamID, endHeaders, frag)
}
if err != nil {
return err
}
func (w *writeResHeaders) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
if firstFrag {
return ctx.Framer().WriteHeaders(HeadersFrameParam{
StreamID: w.streamID,
BlockFragment: frag,
EndStream: w.endStream,
EndHeaders: lastFrag,
})
} else {
return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
}
}
// writePushPromise is a request to write a PUSH_PROMISE and 0+ CONTINUATION frames.
type writePushPromise struct {
streamID uint32 // pusher stream
method string // for :method
url *url.URL // for :scheme, :authority, :path
h http.Header
// Creates an ID for a pushed stream. This runs on serveG just before
// the frame is written. The returned ID is copied to promisedID.
allocatePromisedID func() (uint32, error)
promisedID uint32
}
func (w *writePushPromise) staysWithinBuffer(max int) bool {
// TODO: see writeResHeaders.staysWithinBuffer
return false
}
func (w *writePushPromise) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
encKV(enc, ":method", w.method)
encKV(enc, ":scheme", w.url.Scheme)
encKV(enc, ":authority", w.url.Host)
encKV(enc, ":path", w.url.RequestURI())
encodeHeaders(enc, w.h, nil)
headerBlock := buf.Bytes()
if len(headerBlock) == 0 {
panic("unexpected empty hpack")
}
return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
}
func (w *writePushPromise) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
if firstFrag {
return ctx.Framer().WritePushPromise(PushPromiseParam{
StreamID: w.streamID,
PromiseID: w.promisedID,
BlockFragment: frag,
EndHeaders: lastFrag,
})
} else {
return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
}
return nil
}
type write100ContinueHeadersFrame struct {
@ -220,35 +317,45 @@ func (w write100ContinueHeadersFrame) writeFrame(ctx writeContext) error {
})
}
func (w write100ContinueHeadersFrame) staysWithinBuffer(max int) bool {
// Sloppy but conservative:
return 9+2*(len(":status")+len("100")) <= max
}
type writeWindowUpdate struct {
streamID uint32 // or 0 for conn-level
n uint32
}
func (wu writeWindowUpdate) staysWithinBuffer(max int) bool { return frameHeaderLen+4 <= max }
func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n)
}
// encodeHeaders encodes an http.Header. If keys is not nil, then (k, h[k])
// is encoded only only if k is in keys.
func encodeHeaders(enc *hpack.Encoder, h http.Header, keys []string) {
// TODO: garbage. pool sorters like http1? hot path for 1 key?
if keys == nil {
keys = make([]string, 0, len(h))
for k := range h {
keys = append(keys, k)
}
sort.Strings(keys)
sorter := sorterPool.Get().(*sorter)
// Using defer here, since the returned keys from the
// sorter.Keys method is only valid until the sorter
// is returned:
defer sorterPool.Put(sorter)
keys = sorter.Keys(h)
}
for _, k := range keys {
vv := h[k]
k = lowerHeader(k)
if !validHeaderFieldName(k) {
// TODO: return an error? golang.org/issue/14048
// For now just omit it.
if !validWireHeaderFieldName(k) {
// Skip it as backup paranoia. Per
// golang.org/issue/14048, these should
// already be rejected at a higher level.
continue
}
isTE := k == "transfer-encoding"
for _, v := range vv {
if !validHeaderFieldValue(v) {
if !httplex.ValidHeaderFieldValue(v) {
// TODO: return an error? golang.org/issue/14048
// For now just omit it.
continue

View File

@ -6,14 +6,53 @@ package http2
import "fmt"
// frameWriteMsg is a request to write a frame.
type frameWriteMsg struct {
// WriteScheduler is the interface implemented by HTTP/2 write schedulers.
// Methods are never called concurrently.
type WriteScheduler interface {
// OpenStream opens a new stream in the write scheduler.
// It is illegal to call this with streamID=0 or with a streamID that is
// already open -- the call may panic.
OpenStream(streamID uint32, options OpenStreamOptions)
// CloseStream closes a stream in the write scheduler. Any frames queued on
// this stream should be discarded. It is illegal to call this on a stream
// that is not open -- the call may panic.
CloseStream(streamID uint32)
// AdjustStream adjusts the priority of the given stream. This may be called
// on a stream that has not yet been opened or has been closed. Note that
// RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
// https://tools.ietf.org/html/rfc7540#section-5.1
AdjustStream(streamID uint32, priority PriorityParam)
// Push queues a frame in the scheduler. In most cases, this will not be
// called with wr.StreamID()!=0 unless that stream is currently open. The one
// exception is RST_STREAM frames, which may be sent on idle or closed streams.
Push(wr FrameWriteRequest)
// Pop dequeues the next frame to write. Returns false if no frames can
// be written. Frames with a given wr.StreamID() are Pop'd in the same
// order they are Push'd.
Pop() (wr FrameWriteRequest, ok bool)
}
// OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
type OpenStreamOptions struct {
// PusherID is zero if the stream was initiated by the client. Otherwise,
// PusherID names the stream that pushed the newly opened stream.
PusherID uint32
}
// FrameWriteRequest is a request to write a frame.
type FrameWriteRequest struct {
// write is the interface value that does the writing, once the
// writeScheduler (below) has decided to select this frame
// to write. The write functions are all defined in write.go.
// WriteScheduler has selected this frame to write. The write
// functions are all defined in write.go.
write writeFramer
stream *stream // used for prioritization. nil for non-stream frames.
// stream is the stream on which this frame will be written.
// nil for non-stream frames like PING and SETTINGS.
stream *stream
// done, if non-nil, must be a buffered channel with space for
// 1 message and is sent the return value from write (or an
@ -21,263 +60,183 @@ type frameWriteMsg struct {
done chan error
}
// for debugging only:
func (wm frameWriteMsg) String() string {
var streamID uint32
if wm.stream != nil {
streamID = wm.stream.id
}
var des string
if s, ok := wm.write.(fmt.Stringer); ok {
des = s.String()
} else {
des = fmt.Sprintf("%T", wm.write)
}
return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
}
// writeScheduler tracks pending frames to write, priorities, and decides
// the next one to use. It is not thread-safe.
type writeScheduler struct {
// zero are frames not associated with a specific stream.
// They're sent before any stream-specific freams.
zero writeQueue
// maxFrameSize is the maximum size of a DATA frame
// we'll write. Must be non-zero and between 16K-16M.
maxFrameSize uint32
// sq contains the stream-specific queues, keyed by stream ID.
// when a stream is idle, it's deleted from the map.
sq map[uint32]*writeQueue
// canSend is a slice of memory that's reused between frame
// scheduling decisions to hold the list of writeQueues (from sq)
// which have enough flow control data to send. After canSend is
// built, the best is selected.
canSend []*writeQueue
// pool of empty queues for reuse.
queuePool []*writeQueue
}
func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
if len(q.s) != 0 {
panic("queue must be empty")
}
ws.queuePool = append(ws.queuePool, q)
}
func (ws *writeScheduler) getEmptyQueue() *writeQueue {
ln := len(ws.queuePool)
if ln == 0 {
return new(writeQueue)
}
q := ws.queuePool[ln-1]
ws.queuePool = ws.queuePool[:ln-1]
return q
}
func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
func (ws *writeScheduler) add(wm frameWriteMsg) {
st := wm.stream
if st == nil {
ws.zero.push(wm)
} else {
ws.streamQueue(st.id).push(wm)
}
}
func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
if q, ok := ws.sq[streamID]; ok {
return q
}
if ws.sq == nil {
ws.sq = make(map[uint32]*writeQueue)
}
q := ws.getEmptyQueue()
ws.sq[streamID] = q
return q
}
// take returns the most important frame to write and removes it from the scheduler.
// It is illegal to call this if the scheduler is empty or if there are no connection-level
// flow control bytes available.
func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
if ws.maxFrameSize == 0 {
panic("internal error: ws.maxFrameSize not initialized or invalid")
}
// If there any frames not associated with streams, prefer those first.
// These are usually SETTINGS, etc.
if !ws.zero.empty() {
return ws.zero.shift(), true
}
if len(ws.sq) == 0 {
return
}
// Next, prioritize frames on streams that aren't DATA frames (no cost).
for id, q := range ws.sq {
if q.firstIsNoCost() {
return ws.takeFrom(id, q)
// StreamID returns the id of the stream this frame will be written to.
// 0 is used for non-stream frames such as PING and SETTINGS.
func (wr FrameWriteRequest) StreamID() uint32 {
if wr.stream == nil {
if se, ok := wr.write.(StreamError); ok {
// (*serverConn).resetStream doesn't set
// stream because it doesn't necessarily have
// one. So special case this type of write
// message.
return se.StreamID
}
}
// Now, all that remains are DATA frames with non-zero bytes to
// send. So pick the best one.
if len(ws.canSend) != 0 {
panic("should be empty")
}
for _, q := range ws.sq {
if n := ws.streamWritableBytes(q); n > 0 {
ws.canSend = append(ws.canSend, q)
}
}
if len(ws.canSend) == 0 {
return
}
defer ws.zeroCanSend()
// TODO: find the best queue
q := ws.canSend[0]
return ws.takeFrom(q.streamID(), q)
}
// zeroCanSend is defered from take.
func (ws *writeScheduler) zeroCanSend() {
for i := range ws.canSend {
ws.canSend[i] = nil
}
ws.canSend = ws.canSend[:0]
}
// streamWritableBytes returns the number of DATA bytes we could write
// from the given queue's stream, if this stream/queue were
// selected. It is an error to call this if q's head isn't a
// *writeData.
func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
wm := q.head()
ret := wm.stream.flow.available() // max we can write
if ret == 0 {
return 0
}
if int32(ws.maxFrameSize) < ret {
ret = int32(ws.maxFrameSize)
}
if ret == 0 {
panic("internal error: ws.maxFrameSize not initialized or invalid")
}
wd := wm.write.(*writeData)
if len(wd.p) < int(ret) {
ret = int32(len(wd.p))
}
return ret
return wr.stream.id
}
func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
wm = q.head()
// If the first item in this queue costs flow control tokens
// and we don't have enough, write as much as we can.
if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
allowed := wm.stream.flow.available() // max we can write
if allowed == 0 {
// No quota available. Caller can try the next stream.
return frameWriteMsg{}, false
}
if int32(ws.maxFrameSize) < allowed {
allowed = int32(ws.maxFrameSize)
}
// TODO: further restrict the allowed size, because even if
// the peer says it's okay to write 16MB data frames, we might
// want to write smaller ones to properly weight competing
// streams' priorities.
if len(wd.p) > int(allowed) {
wm.stream.flow.take(allowed)
chunk := wd.p[:allowed]
wd.p = wd.p[allowed:]
// Make up a new write message of a valid size, rather
// than shifting one off the queue.
return frameWriteMsg{
stream: wm.stream,
write: &writeData{
streamID: wd.streamID,
p: chunk,
// even if the original had endStream set, there
// arebytes remaining because len(wd.p) > allowed,
// so we know endStream is false:
endStream: false,
},
// our caller is blocking on the final DATA frame, not
// these intermediates, so no need to wait:
done: nil,
}, true
}
wm.stream.flow.take(int32(len(wd.p)))
// DataSize returns the number of flow control bytes that must be consumed
// to write this entire frame. This is 0 for non-DATA frames.
func (wr FrameWriteRequest) DataSize() int {
if wd, ok := wr.write.(*writeData); ok {
return len(wd.p)
}
q.shift()
if q.empty() {
ws.putEmptyQueue(q)
delete(ws.sq, id)
}
return wm, true
return 0
}
func (ws *writeScheduler) forgetStream(id uint32) {
q, ok := ws.sq[id]
if !ok {
// Consume consumes min(n, available) bytes from this frame, where available
// is the number of flow control bytes available on the stream. Consume returns
// 0, 1, or 2 frames, where the integer return value gives the number of frames
// returned.
//
// If flow control prevents consuming any bytes, this returns (_, _, 0). If
// the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
// returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
// 'rest' contains the remaining bytes. The consumed bytes are deducted from the
// underlying stream's flow control budget.
func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
var empty FrameWriteRequest
// Non-DATA frames are always consumed whole.
wd, ok := wr.write.(*writeData)
if !ok || len(wd.p) == 0 {
return wr, empty, 1
}
// Might need to split after applying limits.
allowed := wr.stream.flow.available()
if n < allowed {
allowed = n
}
if wr.stream.sc.maxFrameSize < allowed {
allowed = wr.stream.sc.maxFrameSize
}
if allowed <= 0 {
return empty, empty, 0
}
if len(wd.p) > int(allowed) {
wr.stream.flow.take(allowed)
consumed := FrameWriteRequest{
stream: wr.stream,
write: &writeData{
streamID: wd.streamID,
p: wd.p[:allowed],
// Even if the original had endStream set, there
// are bytes remaining because len(wd.p) > allowed,
// so we know endStream is false.
endStream: false,
},
// Our caller is blocking on the final DATA frame, not
// this intermediate frame, so no need to wait.
done: nil,
}
rest := FrameWriteRequest{
stream: wr.stream,
write: &writeData{
streamID: wd.streamID,
p: wd.p[allowed:],
endStream: wd.endStream,
},
done: wr.done,
}
return consumed, rest, 2
}
// The frame is consumed whole.
// NB: This cast cannot overflow because allowed is <= math.MaxInt32.
wr.stream.flow.take(int32(len(wd.p)))
return wr, empty, 1
}
// String is for debugging only.
func (wr FrameWriteRequest) String() string {
var des string
if s, ok := wr.write.(fmt.Stringer); ok {
des = s.String()
} else {
des = fmt.Sprintf("%T", wr.write)
}
return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
}
// replyToWriter sends err to wr.done and panics if the send must block
// This does nothing if wr.done is nil.
func (wr *FrameWriteRequest) replyToWriter(err error) {
if wr.done == nil {
return
}
delete(ws.sq, id)
// But keep it for others later.
for i := range q.s {
q.s[i] = frameWriteMsg{}
select {
case wr.done <- err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
q.s = q.s[:0]
ws.putEmptyQueue(q)
wr.write = nil // prevent use (assume it's tainted after wr.done send)
}
// writeQueue is used by implementations of WriteScheduler.
type writeQueue struct {
s []frameWriteMsg
s []FrameWriteRequest
}
// streamID returns the stream ID for a non-empty stream-specific queue.
func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
func (q *writeQueue) empty() bool { return len(q.s) == 0 }
func (q *writeQueue) push(wm frameWriteMsg) {
q.s = append(q.s, wm)
func (q *writeQueue) push(wr FrameWriteRequest) {
q.s = append(q.s, wr)
}
// head returns the next item that would be removed by shift.
func (q *writeQueue) head() frameWriteMsg {
func (q *writeQueue) shift() FrameWriteRequest {
if len(q.s) == 0 {
panic("invalid use of queue")
}
return q.s[0]
}
func (q *writeQueue) shift() frameWriteMsg {
if len(q.s) == 0 {
panic("invalid use of queue")
}
wm := q.s[0]
wr := q.s[0]
// TODO: less copy-happy queue.
copy(q.s, q.s[1:])
q.s[len(q.s)-1] = frameWriteMsg{}
q.s[len(q.s)-1] = FrameWriteRequest{}
q.s = q.s[:len(q.s)-1]
return wm
return wr
}
func (q *writeQueue) firstIsNoCost() bool {
if df, ok := q.s[0].write.(*writeData); ok {
return len(df.p) == 0
// consume consumes up to n bytes from q.s[0]. If the frame is
// entirely consumed, it is removed from the queue. If the frame
// is partially consumed, the frame is kept with the consumed
// bytes removed. Returns true iff any bytes were consumed.
func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
if len(q.s) == 0 {
return FrameWriteRequest{}, false
}
return true
consumed, rest, numresult := q.s[0].Consume(n)
switch numresult {
case 0:
return FrameWriteRequest{}, false
case 1:
q.shift()
case 2:
q.s[0] = rest
}
return consumed, true
}
type writeQueuePool []*writeQueue
// put inserts an unused writeQueue into the pool.
func (p *writeQueuePool) put(q *writeQueue) {
for i := range q.s {
q.s[i] = FrameWriteRequest{}
}
q.s = q.s[:0]
*p = append(*p, q)
}
// get returns an empty writeQueue.
func (p *writeQueuePool) get() *writeQueue {
ln := len(*p)
if ln == 0 {
return new(writeQueue)
}
x := ln - 1
q := (*p)[x]
(*p)[x] = nil
*p = (*p)[:x]
return q
}

View File

@ -0,0 +1,452 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"fmt"
"math"
"sort"
)
// RFC 7540, Section 5.3.5: the default weight is 16.
const priorityDefaultWeight = 15 // 16 = 15 + 1
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type PriorityWriteSchedulerConfig struct {
// MaxClosedNodesInTree controls the maximum number of closed streams to
// retain in the priority tree. Setting this to zero saves a small amount
// of memory at the cost of performance.
//
// See RFC 7540, Section 5.3.4:
// "It is possible for a stream to become closed while prioritization
// information ... is in transit. ... This potentially creates suboptimal
// prioritization, since the stream could be given a priority that is
// different from what is intended. To avoid these problems, an endpoint
// SHOULD retain stream prioritization state for a period after streams
// become closed. The longer state is retained, the lower the chance that
// streams are assigned incorrect or default priority values."
MaxClosedNodesInTree int
// MaxIdleNodesInTree controls the maximum number of idle streams to
// retain in the priority tree. Setting this to zero saves a small amount
// of memory at the cost of performance.
//
// See RFC 7540, Section 5.3.4:
// Similarly, streams that are in the "idle" state can be assigned
// priority or become a parent of other streams. This allows for the
// creation of a grouping node in the dependency tree, which enables
// more flexible expressions of priority. Idle streams begin with a
// default priority (Section 5.3.5).
MaxIdleNodesInTree int
// ThrottleOutOfOrderWrites enables write throttling to help ensure that
// data is delivered in priority order. This works around a race where
// stream B depends on stream A and both streams are about to call Write
// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
// write as much data from B as possible, but this is suboptimal because A
// is a higher-priority stream. With throttling enabled, we write a small
// amount of data from B to minimize the amount of bandwidth that B can
// steal from A.
ThrottleOutOfOrderWrites bool
}
// NewPriorityWriteScheduler constructs a WriteScheduler that schedules
// frames by following HTTP/2 priorities as described in RFC 7340 Section 5.3.
// If cfg is nil, default options are used.
func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
if cfg == nil {
// For justification of these defaults, see:
// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
cfg = &PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 10,
MaxIdleNodesInTree: 10,
ThrottleOutOfOrderWrites: false,
}
}
ws := &priorityWriteScheduler{
nodes: make(map[uint32]*priorityNode),
maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
}
ws.nodes[0] = &ws.root
if cfg.ThrottleOutOfOrderWrites {
ws.writeThrottleLimit = 1024
} else {
ws.writeThrottleLimit = math.MaxInt32
}
return ws
}
type priorityNodeState int
const (
priorityNodeOpen priorityNodeState = iota
priorityNodeClosed
priorityNodeIdle
)
// priorityNode is a node in an HTTP/2 priority tree.
// Each node is associated with a single stream ID.
// See RFC 7540, Section 5.3.
type priorityNode struct {
q writeQueue // queue of pending frames to write
id uint32 // id of the stream, or 0 for the root of the tree
weight uint8 // the actual weight is weight+1, so the value is in [1,256]
state priorityNodeState // open | closed | idle
bytes int64 // number of bytes written by this node, or 0 if closed
subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
// These links form the priority tree.
parent *priorityNode
kids *priorityNode // start of the kids list
prev, next *priorityNode // doubly-linked list of siblings
}
func (n *priorityNode) setParent(parent *priorityNode) {
if n == parent {
panic("setParent to self")
}
if n.parent == parent {
return
}
// Unlink from current parent.
if parent := n.parent; parent != nil {
if n.prev == nil {
parent.kids = n.next
} else {
n.prev.next = n.next
}
if n.next != nil {
n.next.prev = n.prev
}
}
// Link to new parent.
// If parent=nil, remove n from the tree.
// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
n.parent = parent
if parent == nil {
n.next = nil
n.prev = nil
} else {
n.next = parent.kids
n.prev = nil
if n.next != nil {
n.next.prev = n
}
parent.kids = n
}
}
func (n *priorityNode) addBytes(b int64) {
n.bytes += b
for ; n != nil; n = n.parent {
n.subtreeBytes += b
}
}
// walkReadyInOrder iterates over the tree in priority order, calling f for each node
// with a non-empty write queue. When f returns true, this funcion returns true and the
// walk halts. tmp is used as scratch space for sorting.
//
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
// if any ancestor p of n is still open (ignoring the root node).
func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
if !n.q.empty() && f(n, openParent) {
return true
}
if n.kids == nil {
return false
}
// Don't consider the root "open" when updating openParent since
// we can't send data frames on the root stream (only control frames).
if n.id != 0 {
openParent = openParent || (n.state == priorityNodeOpen)
}
// Common case: only one kid or all kids have the same weight.
// Some clients don't use weights; other clients (like web browsers)
// use mostly-linear priority trees.
w := n.kids.weight
needSort := false
for k := n.kids.next; k != nil; k = k.next {
if k.weight != w {
needSort = true
break
}
}
if !needSort {
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
return true
}
}
return false
}
// Uncommon case: sort the child nodes. We remove the kids from the parent,
// then re-insert after sorting so we can reuse tmp for future sort calls.
*tmp = (*tmp)[:0]
for n.kids != nil {
*tmp = append(*tmp, n.kids)
n.kids.setParent(nil)
}
sort.Sort(sortPriorityNodeSiblings(*tmp))
for i := len(*tmp) - 1; i >= 0; i-- {
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
return true
}
}
return false
}
type sortPriorityNodeSiblings []*priorityNode
func (z sortPriorityNodeSiblings) Len() int { return len(z) }
func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
func (z sortPriorityNodeSiblings) Less(i, k int) bool {
// Prefer the subtree that has sent fewer bytes relative to its weight.
// See sections 5.3.2 and 5.3.4.
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
if bi == 0 && bk == 0 {
return wi >= wk
}
if bk == 0 {
return false
}
return bi/bk <= wi/wk
}
type priorityWriteScheduler struct {
// root is the root of the priority tree, where root.id = 0.
// The root queues control frames that are not associated with any stream.
root priorityNode
// nodes maps stream ids to priority tree nodes.
nodes map[uint32]*priorityNode
// maxID is the maximum stream id in nodes.
maxID uint32
// lists of nodes that have been closed or are idle, but are kept in
// the tree for improved prioritization. When the lengths exceed either
// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
closedNodes, idleNodes []*priorityNode
// From the config.
maxClosedNodesInTree int
maxIdleNodesInTree int
writeThrottleLimit int32
enableWriteThrottle bool
// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
tmp []*priorityNode
// pool of empty queues for reuse.
queuePool writeQueuePool
}
func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
// The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
if curr.state != priorityNodeIdle {
panic(fmt.Sprintf("stream %d already opened", streamID))
}
curr.state = priorityNodeOpen
return
}
// RFC 7540, Section 5.3.5:
// "All streams are initially assigned a non-exclusive dependency on stream 0x0.
// Pushed streams initially depend on their associated stream. In both cases,
// streams are assigned a default weight of 16."
parent := ws.nodes[options.PusherID]
if parent == nil {
parent = &ws.root
}
n := &priorityNode{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeight,
state: priorityNodeOpen,
}
n.setParent(parent)
ws.nodes[streamID] = n
if streamID > ws.maxID {
ws.maxID = streamID
}
}
func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
if streamID == 0 {
panic("violation of WriteScheduler interface: cannot close stream 0")
}
if ws.nodes[streamID] == nil {
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
}
if ws.nodes[streamID].state != priorityNodeOpen {
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
}
n := ws.nodes[streamID]
n.state = priorityNodeClosed
n.addBytes(-n.bytes)
q := n.q
ws.queuePool.put(&q)
n.q.s = nil
if ws.maxClosedNodesInTree > 0 {
ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
} else {
ws.removeNode(n)
}
}
func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
if streamID == 0 {
panic("adjustPriority on root")
}
// If streamID does not exist, there are two cases:
// - A closed stream that has been removed (this will have ID <= maxID)
// - An idle stream that is being used for "grouping" (this will have ID > maxID)
n := ws.nodes[streamID]
if n == nil {
if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
return
}
ws.maxID = streamID
n = &priorityNode{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeight,
state: priorityNodeIdle,
}
n.setParent(&ws.root)
ws.nodes[streamID] = n
ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
}
// Section 5.3.1: A dependency on a stream that is not currently in the tree
// results in that stream being given a default priority (Section 5.3.5).
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
n.weight = priorityDefaultWeight
return
}
// Ignore if the client tries to make a node its own parent.
if n == parent {
return
}
// Section 5.3.3:
// "If a stream is made dependent on one of its own dependencies, the
// formerly dependent stream is first moved to be dependent on the
// reprioritized stream's previous parent. The moved dependency retains
// its weight."
//
// That is: if parent depends on n, move parent to depend on n.parent.
for x := parent.parent; x != nil; x = x.parent {
if x == n {
parent.setParent(n.parent)
break
}
}
// Section 5.3.3: The exclusive flag causes the stream to become the sole
// dependency of its parent stream, causing other dependencies to become
// dependent on the exclusive stream.
if priority.Exclusive {
k := parent.kids
for k != nil {
next := k.next
if k != n {
k.setParent(n)
}
k = next
}
}
n.setParent(parent)
n.weight = priority.Weight
}
func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
var n *priorityNode
if id := wr.StreamID(); id == 0 {
n = &ws.root
} else {
n = ws.nodes[id]
if n == nil {
// id is an idle or closed stream. wr should not be a HEADERS or
// DATA frame. However, wr can be a RST_STREAM. In this case, we
// push wr onto the root, rather than creating a new priorityNode,
// since RST_STREAM is tiny and the stream's priority is unknown
// anyway. See issue #17919.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
n = &ws.root
}
}
n.q.push(wr)
}
func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
limit := int32(math.MaxInt32)
if openParent {
limit = ws.writeThrottleLimit
}
wr, ok = n.q.consume(limit)
if !ok {
return false
}
n.addBytes(int64(wr.DataSize()))
// If B depends on A and B continuously has data available but A
// does not, gradually increase the throttling limit to allow B to
// steal more and more bandwidth from A.
if openParent {
ws.writeThrottleLimit += 1024
if ws.writeThrottleLimit < 0 {
ws.writeThrottleLimit = math.MaxInt32
}
} else if ws.enableWriteThrottle {
ws.writeThrottleLimit = 1024
}
return true
})
return wr, ok
}
func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
if maxSize == 0 {
return
}
if len(*list) == maxSize {
// Remove the oldest node, then shift left.
ws.removeNode((*list)[0])
x := (*list)[1:]
copy(*list, x)
*list = (*list)[:len(x)]
}
*list = append(*list, n)
}
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
for k := n.kids; k != nil; k = k.next {
k.setParent(n.parent)
}
n.setParent(nil)
delete(ws.nodes, n.id)
}

72
cmd/vendor/golang.org/x/net/http2/writesched_random.go generated vendored Normal file
View File

@ -0,0 +1,72 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "math"
// NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
// priorities. Control frames like SETTINGS and PING are written before DATA
// frames, but if no control frames are queued and multiple streams have queued
// HEADERS or DATA frames, Pop selects a ready stream arbitrarily.
func NewRandomWriteScheduler() WriteScheduler {
return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)}
}
type randomWriteScheduler struct {
// zero are frames not associated with a specific stream.
zero writeQueue
// sq contains the stream-specific queues, keyed by stream ID.
// When a stream is idle or closed, it's deleted from the map.
sq map[uint32]*writeQueue
// pool of empty queues for reuse.
queuePool writeQueuePool
}
func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
// no-op: idle streams are not tracked
}
func (ws *randomWriteScheduler) CloseStream(streamID uint32) {
q, ok := ws.sq[streamID]
if !ok {
return
}
delete(ws.sq, streamID)
ws.queuePool.put(q)
}
func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
// no-op: priorities are ignored
}
func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) {
id := wr.StreamID()
if id == 0 {
ws.zero.push(wr)
return
}
q, ok := ws.sq[id]
if !ok {
q = ws.queuePool.get()
ws.sq[id] = q
}
q.push(wr)
}
func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) {
// Control frames first.
if !ws.zero.empty() {
return ws.zero.shift(), true
}
// Iterate over all non-idle streams until finding one that can be consumed.
for _, q := range ws.sq {
if wr, ok := q.consume(math.MaxInt32); ok {
return wr, true
}
}
return FrameWriteRequest{}, false
}

68
cmd/vendor/golang.org/x/net/idna/idna.go generated vendored Normal file
View File

@ -0,0 +1,68 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package idna implements IDNA2008 (Internationalized Domain Names for
// Applications), defined in RFC 5890, RFC 5891, RFC 5892, RFC 5893 and
// RFC 5894.
package idna // import "golang.org/x/net/idna"
import (
"strings"
"unicode/utf8"
)
// TODO(nigeltao): specify when errors occur. For example, is ToASCII(".") or
// ToASCII("foo\x00") an error? See also http://www.unicode.org/faq/idn.html#11
// acePrefix is the ASCII Compatible Encoding prefix.
const acePrefix = "xn--"
// ToASCII converts a domain or domain label to its ASCII form. For example,
// ToASCII("bücher.example.com") is "xn--bcher-kva.example.com", and
// ToASCII("golang") is "golang".
func ToASCII(s string) (string, error) {
if ascii(s) {
return s, nil
}
labels := strings.Split(s, ".")
for i, label := range labels {
if !ascii(label) {
a, err := encode(acePrefix, label)
if err != nil {
return "", err
}
labels[i] = a
}
}
return strings.Join(labels, "."), nil
}
// ToUnicode converts a domain or domain label to its Unicode form. For example,
// ToUnicode("xn--bcher-kva.example.com") is "bücher.example.com", and
// ToUnicode("golang") is "golang".
func ToUnicode(s string) (string, error) {
if !strings.Contains(s, acePrefix) {
return s, nil
}
labels := strings.Split(s, ".")
for i, label := range labels {
if strings.HasPrefix(label, acePrefix) {
u, err := decode(label[len(acePrefix):])
if err != nil {
return "", err
}
labels[i] = u
}
}
return strings.Join(labels, "."), nil
}
func ascii(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] >= utf8.RuneSelf {
return false
}
}
return true
}

200
cmd/vendor/golang.org/x/net/idna/punycode.go generated vendored Normal file
View File

@ -0,0 +1,200 @@
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package idna
// This file implements the Punycode algorithm from RFC 3492.
import (
"fmt"
"math"
"strings"
"unicode/utf8"
)
// These parameter values are specified in section 5.
//
// All computation is done with int32s, so that overflow behavior is identical
// regardless of whether int is 32-bit or 64-bit.
const (
base int32 = 36
damp int32 = 700
initialBias int32 = 72
initialN int32 = 128
skew int32 = 38
tmax int32 = 26
tmin int32 = 1
)
// decode decodes a string as specified in section 6.2.
func decode(encoded string) (string, error) {
if encoded == "" {
return "", nil
}
pos := 1 + strings.LastIndex(encoded, "-")
if pos == 1 {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
if pos == len(encoded) {
return encoded[:len(encoded)-1], nil
}
output := make([]rune, 0, len(encoded))
if pos != 0 {
for _, r := range encoded[:pos-1] {
output = append(output, r)
}
}
i, n, bias := int32(0), initialN, initialBias
for pos < len(encoded) {
oldI, w := i, int32(1)
for k := base; ; k += base {
if pos == len(encoded) {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
digit, ok := decodeDigit(encoded[pos])
if !ok {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
pos++
i += digit * w
if i < 0 {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
t := k - bias
if t < tmin {
t = tmin
} else if t > tmax {
t = tmax
}
if digit < t {
break
}
w *= base - t
if w >= math.MaxInt32/base {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
}
x := int32(len(output) + 1)
bias = adapt(i-oldI, x, oldI == 0)
n += i / x
i %= x
if n > utf8.MaxRune || len(output) >= 1024 {
return "", fmt.Errorf("idna: invalid label %q", encoded)
}
output = append(output, 0)
copy(output[i+1:], output[i:])
output[i] = n
i++
}
return string(output), nil
}
// encode encodes a string as specified in section 6.3 and prepends prefix to
// the result.
//
// The "while h < length(input)" line in the specification becomes "for
// remaining != 0" in the Go code, because len(s) in Go is in bytes, not runes.
func encode(prefix, s string) (string, error) {
output := make([]byte, len(prefix), len(prefix)+1+2*len(s))
copy(output, prefix)
delta, n, bias := int32(0), initialN, initialBias
b, remaining := int32(0), int32(0)
for _, r := range s {
if r < 0x80 {
b++
output = append(output, byte(r))
} else {
remaining++
}
}
h := b
if b > 0 {
output = append(output, '-')
}
for remaining != 0 {
m := int32(0x7fffffff)
for _, r := range s {
if m > r && r >= n {
m = r
}
}
delta += (m - n) * (h + 1)
if delta < 0 {
return "", fmt.Errorf("idna: invalid label %q", s)
}
n = m
for _, r := range s {
if r < n {
delta++
if delta < 0 {
return "", fmt.Errorf("idna: invalid label %q", s)
}
continue
}
if r > n {
continue
}
q := delta
for k := base; ; k += base {
t := k - bias
if t < tmin {
t = tmin
} else if t > tmax {
t = tmax
}
if q < t {
break
}
output = append(output, encodeDigit(t+(q-t)%(base-t)))
q = (q - t) / (base - t)
}
output = append(output, encodeDigit(q))
bias = adapt(delta, h+1, h == b)
delta = 0
h++
remaining--
}
delta++
n++
}
return string(output), nil
}
func decodeDigit(x byte) (digit int32, ok bool) {
switch {
case '0' <= x && x <= '9':
return int32(x - ('0' - 26)), true
case 'A' <= x && x <= 'Z':
return int32(x - 'A'), true
case 'a' <= x && x <= 'z':
return int32(x - 'a'), true
}
return 0, false
}
func encodeDigit(digit int32) byte {
switch {
case 0 <= digit && digit < 26:
return byte(digit + 'a')
case 26 <= digit && digit < 36:
return byte(digit + ('0' - 26))
}
panic("idna: internal error in punycode encoding")
}
// adapt is the bias adaptation function specified in section 6.1.
func adapt(delta, numPoints int32, firstTime bool) int32 {
if firstTime {
delta /= damp
} else {
delta /= 2
}
delta += delta / numPoints
k := int32(0)
for delta > ((base-tmin)*tmax)/2 {
delta /= base - tmin
k += base
}
return k + (base-tmin+1)*delta/(delta+skew)
}

351
cmd/vendor/golang.org/x/net/lex/httplex/httplex.go generated vendored Normal file
View File

@ -0,0 +1,351 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package httplex contains rules around lexical matters of various
// HTTP-related specifications.
//
// This package is shared by the standard library (which vendors it)
// and x/net/http2. It comes with no API stability promise.
package httplex
import (
"net"
"strings"
"unicode/utf8"
"golang.org/x/net/idna"
)
var isTokenTable = [127]bool{
'!': true,
'#': true,
'$': true,
'%': true,
'&': true,
'\'': true,
'*': true,
'+': true,
'-': true,
'.': true,
'0': true,
'1': true,
'2': true,
'3': true,
'4': true,
'5': true,
'6': true,
'7': true,
'8': true,
'9': true,
'A': true,
'B': true,
'C': true,
'D': true,
'E': true,
'F': true,
'G': true,
'H': true,
'I': true,
'J': true,
'K': true,
'L': true,
'M': true,
'N': true,
'O': true,
'P': true,
'Q': true,
'R': true,
'S': true,
'T': true,
'U': true,
'W': true,
'V': true,
'X': true,
'Y': true,
'Z': true,
'^': true,
'_': true,
'`': true,
'a': true,
'b': true,
'c': true,
'd': true,
'e': true,
'f': true,
'g': true,
'h': true,
'i': true,
'j': true,
'k': true,
'l': true,
'm': true,
'n': true,
'o': true,
'p': true,
'q': true,
'r': true,
's': true,
't': true,
'u': true,
'v': true,
'w': true,
'x': true,
'y': true,
'z': true,
'|': true,
'~': true,
}
func IsTokenRune(r rune) bool {
i := int(r)
return i < len(isTokenTable) && isTokenTable[i]
}
func isNotToken(r rune) bool {
return !IsTokenRune(r)
}
// HeaderValuesContainsToken reports whether any string in values
// contains the provided token, ASCII case-insensitively.
func HeaderValuesContainsToken(values []string, token string) bool {
for _, v := range values {
if headerValueContainsToken(v, token) {
return true
}
}
return false
}
// isOWS reports whether b is an optional whitespace byte, as defined
// by RFC 7230 section 3.2.3.
func isOWS(b byte) bool { return b == ' ' || b == '\t' }
// trimOWS returns x with all optional whitespace removes from the
// beginning and end.
func trimOWS(x string) string {
// TODO: consider using strings.Trim(x, " \t") instead,
// if and when it's fast enough. See issue 10292.
// But this ASCII-only code will probably always beat UTF-8
// aware code.
for len(x) > 0 && isOWS(x[0]) {
x = x[1:]
}
for len(x) > 0 && isOWS(x[len(x)-1]) {
x = x[:len(x)-1]
}
return x
}
// headerValueContainsToken reports whether v (assumed to be a
// 0#element, in the ABNF extension described in RFC 7230 section 7)
// contains token amongst its comma-separated tokens, ASCII
// case-insensitively.
func headerValueContainsToken(v string, token string) bool {
v = trimOWS(v)
if comma := strings.IndexByte(v, ','); comma != -1 {
return tokenEqual(trimOWS(v[:comma]), token) || headerValueContainsToken(v[comma+1:], token)
}
return tokenEqual(v, token)
}
// lowerASCII returns the ASCII lowercase version of b.
func lowerASCII(b byte) byte {
if 'A' <= b && b <= 'Z' {
return b + ('a' - 'A')
}
return b
}
// tokenEqual reports whether t1 and t2 are equal, ASCII case-insensitively.
func tokenEqual(t1, t2 string) bool {
if len(t1) != len(t2) {
return false
}
for i, b := range t1 {
if b >= utf8.RuneSelf {
// No UTF-8 or non-ASCII allowed in tokens.
return false
}
if lowerASCII(byte(b)) != lowerASCII(t2[i]) {
return false
}
}
return true
}
// isLWS reports whether b is linear white space, according
// to http://www.w3.org/Protocols/rfc2616/rfc2616-sec2.html#sec2.2
// LWS = [CRLF] 1*( SP | HT )
func isLWS(b byte) bool { return b == ' ' || b == '\t' }
// isCTL reports whether b is a control byte, according
// to http://www.w3.org/Protocols/rfc2616/rfc2616-sec2.html#sec2.2
// CTL = <any US-ASCII control character
// (octets 0 - 31) and DEL (127)>
func isCTL(b byte) bool {
const del = 0x7f // a CTL
return b < ' ' || b == del
}
// ValidHeaderFieldName reports whether v is a valid HTTP/1.x header name.
// HTTP/2 imposes the additional restriction that uppercase ASCII
// letters are not allowed.
//
// RFC 7230 says:
// header-field = field-name ":" OWS field-value OWS
// field-name = token
// token = 1*tchar
// tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
// "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
func ValidHeaderFieldName(v string) bool {
if len(v) == 0 {
return false
}
for _, r := range v {
if !IsTokenRune(r) {
return false
}
}
return true
}
// ValidHostHeader reports whether h is a valid host header.
func ValidHostHeader(h string) bool {
// The latest spec is actually this:
//
// http://tools.ietf.org/html/rfc7230#section-5.4
// Host = uri-host [ ":" port ]
//
// Where uri-host is:
// http://tools.ietf.org/html/rfc3986#section-3.2.2
//
// But we're going to be much more lenient for now and just
// search for any byte that's not a valid byte in any of those
// expressions.
for i := 0; i < len(h); i++ {
if !validHostByte[h[i]] {
return false
}
}
return true
}
// See the validHostHeader comment.
var validHostByte = [256]bool{
'0': true, '1': true, '2': true, '3': true, '4': true, '5': true, '6': true, '7': true,
'8': true, '9': true,
'a': true, 'b': true, 'c': true, 'd': true, 'e': true, 'f': true, 'g': true, 'h': true,
'i': true, 'j': true, 'k': true, 'l': true, 'm': true, 'n': true, 'o': true, 'p': true,
'q': true, 'r': true, 's': true, 't': true, 'u': true, 'v': true, 'w': true, 'x': true,
'y': true, 'z': true,
'A': true, 'B': true, 'C': true, 'D': true, 'E': true, 'F': true, 'G': true, 'H': true,
'I': true, 'J': true, 'K': true, 'L': true, 'M': true, 'N': true, 'O': true, 'P': true,
'Q': true, 'R': true, 'S': true, 'T': true, 'U': true, 'V': true, 'W': true, 'X': true,
'Y': true, 'Z': true,
'!': true, // sub-delims
'$': true, // sub-delims
'%': true, // pct-encoded (and used in IPv6 zones)
'&': true, // sub-delims
'(': true, // sub-delims
')': true, // sub-delims
'*': true, // sub-delims
'+': true, // sub-delims
',': true, // sub-delims
'-': true, // unreserved
'.': true, // unreserved
':': true, // IPv6address + Host expression's optional port
';': true, // sub-delims
'=': true, // sub-delims
'[': true,
'\'': true, // sub-delims
']': true,
'_': true, // unreserved
'~': true, // unreserved
}
// ValidHeaderFieldValue reports whether v is a valid "field-value" according to
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 :
//
// message-header = field-name ":" [ field-value ]
// field-value = *( field-content | LWS )
// field-content = <the OCTETs making up the field-value
// and consisting of either *TEXT or combinations
// of token, separators, and quoted-string>
//
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec2.html#sec2.2 :
//
// TEXT = <any OCTET except CTLs,
// but including LWS>
// LWS = [CRLF] 1*( SP | HT )
// CTL = <any US-ASCII control character
// (octets 0 - 31) and DEL (127)>
//
// RFC 7230 says:
// field-value = *( field-content / obs-fold )
// obj-fold = N/A to http2, and deprecated
// field-content = field-vchar [ 1*( SP / HTAB ) field-vchar ]
// field-vchar = VCHAR / obs-text
// obs-text = %x80-FF
// VCHAR = "any visible [USASCII] character"
//
// http2 further says: "Similarly, HTTP/2 allows header field values
// that are not valid. While most of the values that can be encoded
// will not alter header field parsing, carriage return (CR, ASCII
// 0xd), line feed (LF, ASCII 0xa), and the zero character (NUL, ASCII
// 0x0) might be exploited by an attacker if they are translated
// verbatim. Any request or response that contains a character not
// permitted in a header field value MUST be treated as malformed
// (Section 8.1.2.6). Valid characters are defined by the
// field-content ABNF rule in Section 3.2 of [RFC7230]."
//
// This function does not (yet?) properly handle the rejection of
// strings that begin or end with SP or HTAB.
func ValidHeaderFieldValue(v string) bool {
for i := 0; i < len(v); i++ {
b := v[i]
if isCTL(b) && !isLWS(b) {
return false
}
}
return true
}
func isASCII(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] >= utf8.RuneSelf {
return false
}
}
return true
}
// PunycodeHostPort returns the IDNA Punycode version
// of the provided "host" or "host:port" string.
func PunycodeHostPort(v string) (string, error) {
if isASCII(v) {
return v, nil
}
host, port, err := net.SplitHostPort(v)
if err != nil {
// The input 'v' argument was just a "host" argument,
// without a port. This error should not be returned
// to the caller.
host = v
port = ""
}
host, err = idna.ToASCII(host)
if err != nil {
// Non-UTF-8? Not representable in Punycode, in any
// case.
return "", err
}
if port == "" {
return host, nil
}
return net.JoinHostPort(host, port), nil
}

View File

@ -21,11 +21,6 @@ import (
"time"
)
var eventsTmpl = template.Must(template.New("events").Funcs(template.FuncMap{
"elapsed": elapsed,
"trimSpace": strings.TrimSpace,
}).Parse(eventsHTML))
const maxEventsPerLog = 100
type bucket struct {
@ -101,7 +96,7 @@ func RenderEvents(w http.ResponseWriter, req *http.Request, sensitive bool) {
famMu.RLock()
defer famMu.RUnlock()
if err := eventsTmpl.Execute(w, data); err != nil {
if err := eventsTmpl().Execute(w, data); err != nil {
log.Printf("net/trace: Failed executing template: %v", err)
}
}
@ -421,6 +416,19 @@ func freeEventLog(el *eventLog) {
}
}
var eventsTmplCache *template.Template
var eventsTmplOnce sync.Once
func eventsTmpl() *template.Template {
eventsTmplOnce.Do(func() {
eventsTmplCache = template.Must(template.New("events").Funcs(template.FuncMap{
"elapsed": elapsed,
"trimSpace": strings.TrimSpace,
}).Parse(eventsHTML))
})
return eventsTmplCache
}
const eventsHTML = `
<html>
<head>

View File

@ -12,6 +12,7 @@ import (
"html/template"
"log"
"math"
"sync"
"golang.org/x/net/internal/timeseries"
)
@ -320,15 +321,20 @@ func (h *histogram) newData() *data {
func (h *histogram) html() template.HTML {
buf := new(bytes.Buffer)
if err := distTmpl.Execute(buf, h.newData()); err != nil {
if err := distTmpl().Execute(buf, h.newData()); err != nil {
buf.Reset()
log.Printf("net/trace: couldn't execute template: %v", err)
}
return template.HTML(buf.String())
}
// Input: data
var distTmpl = template.Must(template.New("distTmpl").Parse(`
var distTmplCache *template.Template
var distTmplOnce sync.Once
func distTmpl() *template.Template {
distTmplOnce.Do(func() {
// Input: data
distTmplCache = template.Must(template.New("distTmpl").Parse(`
<table>
<tr>
<td style="padding:0.25em">Count: {{.Count}}</td>
@ -354,3 +360,6 @@ var distTmpl = template.Must(template.New("distTmpl").Parse(`
{{end}}
</table>
`))
})
return distTmplCache
}

View File

@ -91,15 +91,19 @@ var DebugUseAfterFinish = false
// It returns two bools; the first indicates whether the page may be viewed at all,
// and the second indicates whether sensitive events will be shown.
//
// AuthRequest may be replaced by a program to customise its authorisation requirements.
// AuthRequest may be replaced by a program to customize its authorization requirements.
//
// The default AuthRequest function returns (true, true) iff the request comes from localhost/127.0.0.1/[::1].
// The default AuthRequest function returns (true, true) if and only if the request
// comes from localhost/127.0.0.1/[::1].
var AuthRequest = func(req *http.Request) (any, sensitive bool) {
// RemoteAddr is commonly in the form "IP" or "IP:port".
// If it is in the form "IP:port", split off the port.
host, _, err := net.SplitHostPort(req.RemoteAddr)
switch {
case err != nil: // Badly formed address; fail closed.
return false, false
case host == "localhost" || host == "127.0.0.1" || host == "::1":
if err != nil {
host = req.RemoteAddr
}
switch host {
case "localhost", "127.0.0.1", "::1":
return true, true
default:
return false, false
@ -234,7 +238,7 @@ func Render(w io.Writer, req *http.Request, sensitive bool) {
completedMu.RLock()
defer completedMu.RUnlock()
if err := pageTmpl.ExecuteTemplate(w, "Page", data); err != nil {
if err := pageTmpl().ExecuteTemplate(w, "Page", data); err != nil {
log.Printf("net/trace: Failed executing template: %v", err)
}
}
@ -329,7 +333,8 @@ func New(family, title string) Trace {
tr.ref()
tr.Family, tr.Title = family, title
tr.Start = time.Now()
tr.events = make([]event, 0, maxEventsPerTrace)
tr.maxEvents = maxEventsPerTrace
tr.events = tr.eventsBuf[:0]
activeMu.RLock()
s := activeTraces[tr.Family]
@ -646,8 +651,8 @@ type event struct {
Elapsed time.Duration // since previous event in trace
NewDay bool // whether this event is on a different day to the previous event
Recyclable bool // whether this event was passed via LazyLog
What interface{} // string or fmt.Stringer
Sensitive bool // whether this event contains sensitive information
What interface{} // string or fmt.Stringer
}
// WhenString returns a string representation of the elapsed time of the event.
@ -688,14 +693,17 @@ type trace struct {
IsError bool
// Append-only sequence of events (modulo discards).
mu sync.RWMutex
events []event
mu sync.RWMutex
events []event
maxEvents int
refs int32 // how many buckets this is in
recycler func(interface{})
disc discarded // scratch space to avoid allocation
finishStack []byte // where finish was called, if DebugUseAfterFinish is set
eventsBuf [4]event // preallocated buffer in case we only log a few events
}
func (tr *trace) reset() {
@ -707,11 +715,15 @@ func (tr *trace) reset() {
tr.traceID = 0
tr.spanID = 0
tr.IsError = false
tr.maxEvents = 0
tr.events = nil
tr.refs = 0
tr.recycler = nil
tr.disc = 0
tr.finishStack = nil
for i := range tr.eventsBuf {
tr.eventsBuf[i] = event{}
}
}
// delta returns the elapsed time since the last event or the trace start,
@ -740,7 +752,7 @@ func (tr *trace) addEvent(x interface{}, recyclable, sensitive bool) {
and very unlikely to be the fault of this code.
The most likely scenario is that some code elsewhere is using
a requestz.Trace after its Finish method is called.
a trace.Trace after its Finish method is called.
You can temporarily set the DebugUseAfterFinish var
to help discover where that is; do not leave that var set,
since it makes this package much less efficient.
@ -749,11 +761,11 @@ func (tr *trace) addEvent(x interface{}, recyclable, sensitive bool) {
e := event{When: time.Now(), What: x, Recyclable: recyclable, Sensitive: sensitive}
tr.mu.Lock()
e.Elapsed, e.NewDay = tr.delta(e.When)
if len(tr.events) < cap(tr.events) {
if len(tr.events) < tr.maxEvents {
tr.events = append(tr.events, e)
} else {
// Discard the middle events.
di := int((cap(tr.events) - 1) / 2)
di := int((tr.maxEvents - 1) / 2)
if d, ok := tr.events[di].What.(*discarded); ok {
(*d)++
} else {
@ -773,7 +785,7 @@ func (tr *trace) addEvent(x interface{}, recyclable, sensitive bool) {
go tr.recycler(tr.events[di+1].What)
}
copy(tr.events[di+1:], tr.events[di+2:])
tr.events[cap(tr.events)-1] = e
tr.events[tr.maxEvents-1] = e
}
tr.mu.Unlock()
}
@ -799,7 +811,7 @@ func (tr *trace) SetTraceInfo(traceID, spanID uint64) {
func (tr *trace) SetMaxEvents(m int) {
// Always keep at least three events: first, discarded count, last.
if len(tr.events) == 0 && m > 3 {
tr.events = make([]event, 0, m)
tr.maxEvents = m
}
}
@ -890,10 +902,18 @@ func elapsed(d time.Duration) string {
return string(b)
}
var pageTmpl = template.Must(template.New("Page").Funcs(template.FuncMap{
"elapsed": elapsed,
"add": func(a, b int) int { return a + b },
}).Parse(pageHTML))
var pageTmplCache *template.Template
var pageTmplOnce sync.Once
func pageTmpl() *template.Template {
pageTmplOnce.Do(func() {
pageTmplCache = template.Must(template.New("Page").Funcs(template.FuncMap{
"elapsed": elapsed,
"add": func(a, b int) int { return a + b },
}).Parse(pageHTML))
})
return pageTmplCache
}
const pageHTML = `
{{template "Prolog" .}}

View File

@ -74,7 +74,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
plog.Noticef("got bootstrap from DNS for %s at %s%s", service, scheme, urlHost)
plog.Noticef("got bootstrap from DNS for %s at %s://%s", service, scheme, urlHost)
if ok && url.Scheme != scheme {
plog.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}

View File

@ -16,15 +16,20 @@ package e2e
import "testing"
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) }
func defragTest(cx ctlCtx) {
func maintenanceInitKeys(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
}
}
func defragTest(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := ctlV3Compact(cx, 4, cx.compactPhysical); err != nil {
cx.t.Fatal(err)
@ -35,6 +40,29 @@ func defragTest(cx ctlCtx) {
}
}
func defragTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
// ordinal user cannot defrag
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Defrag(cx); err == nil {
cx.t.Fatal("ordinal user should not be able to issue a defrag request")
}
// root can defrag
cx.user, cx.pass = "root", "root"
if err := ctlV3Defrag(cx); err != nil {
cx.t.Fatal(err)
}
}
func ctlV3Defrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "defrag")
lines := make([]string, cx.epc.cfg.clusterSize)

View File

@ -32,17 +32,20 @@ import (
func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
func snapshotTest(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
maintenanceInitKeys(cx)
leaseID, err := ctlV3LeaseGrant(cx, 100)
if err != nil {
cx.t.Fatalf("snapshot: ctlV3LeaseGrant error (%v)", err)
}
if err = ctlV3Put(cx, "withlease", "withlease", leaseID); err != nil {
cx.t.Fatalf("snapshot: ctlV3Put error (%v)", err)
}
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
if err = ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
@ -50,11 +53,11 @@ func snapshotTest(cx ctlCtx) {
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
if st.Revision != 5 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
if st.TotalKey < 4 {
cx.t.Fatalf("expected at least 4, got %d", st.TotalKey)
}
}
@ -242,3 +245,42 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
}
func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) }
func snapshotTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
// ordinal user cannot save a snapshot
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3SnapshotSave(cx, fpath); err == nil {
cx.t.Fatal("ordinal user should not be able to save a snapshot")
}
// root can save a snapshot
cx.user, cx.pass = "root", "root"
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
st, err := getSnapshotStatus(cx, fpath)
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
}
}

View File

@ -55,20 +55,12 @@ var (
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
DefaultAdvertiseClientURLs = "http://localhost:2379"
defaultHostname string = "localhost"
defaultHostname string
defaultHostStatus error
)
func init() {
ip, err := netutil.GetDefaultHost()
if err != nil {
defaultHostStatus = err
return
}
// found default host, advertise on it
DefaultInitialAdvertisePeerURLs = "http://" + ip + ":2380"
DefaultAdvertiseClientURLs = "http://" + ip + ":2379"
defaultHostname = ip
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
}
// Config holds the arguments for configuring an etcd server.
@ -237,6 +229,9 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.ACUrls = []url.URL(u)
}
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) {
cfg.InitialCluster = ""
}
if cfg.ClusterState == "" {
cfg.ClusterState = ClusterStateFlagNew
}
@ -346,34 +341,52 @@ func (cfg Config) InitialClusterFromName(name string) (ret string) {
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// IsDefaultHost returns the default hostname, if used, and the error, if any,
// from getting the machine's default host.
func (cfg Config) IsDefaultHost() (string, error) {
if len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs {
return defaultHostname, defaultHostStatus
}
if len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs {
return defaultHostname, defaultHostStatus
}
return "", defaultHostStatus
func (cfg Config) defaultPeerHost() bool {
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with default host.
func (cfg Config) defaultClientHost() bool {
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
}
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
// then the advertise peer host would be updated with machine's default host,
// while keeping the listen URL's port.
// User can work around this by explicitly setting URL with 127.0.0.1.
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
// TODO: check whether fields are set instead of whether fields have default value
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) {
defaultHost, defaultHostErr := cfg.IsDefaultHost()
defaultHostOverride := defaultHost == "" || defaultHostErr == nil
if (defaultHostOverride || cfg.Name != DefaultName) && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
ip, _, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
// if client-listen-url is 0.0.0.0, just use detected default host
// otherwise, rewrite advertise-client-url with localhost
if ip != "0.0.0.0" {
_, acPort, _ := net.SplitHostPort(cfg.ACUrls[0].Host)
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("localhost:%s", acPort)}
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
if defaultHostname == "" || defaultHostStatus != nil {
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
return "", defaultHostStatus
}
used := false
pip, pport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
used = true
}
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
}
cip, cport, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
if cfg.defaultClientHost() && cip == "0.0.0.0" {
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
used = true
}
dhost := defaultHostname
if !used {
dhost = ""
}
return dhost, defaultHostStatus
}
// checkBindURLs returns an error if any URL uses a domain name.

View File

@ -15,11 +15,15 @@
package embed
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"testing"
"github.com/coreos/etcd/pkg/transport"
"github.com/ghodss/yaml"
)
@ -61,6 +65,70 @@ func TestConfigFileOtherFields(t *testing.T) {
}
}
// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
func TestUpdateDefaultClusterFromName(t *testing.T) {
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origpeer := cfg.APUrls[0].String()
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
// in case of 'etcd --name=abc'
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if exp != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
}
// advertise peer URL should not be affected
if origpeer != cfg.APUrls[0].String() {
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
// TestUpdateDefaultClusterFromNameOverwrite ensures that machine's default host is only used
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
if defaultHostname == "" {
t.Skip("machine's default host not found")
}
cfg := NewConfig()
defaultInitialCluster := cfg.InitialCluster
oldscheme := cfg.APUrls[0].Scheme
origadvc := cfg.ACUrls[0].String()
cfg.Name = "abc"
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
if dhost != defaultHostname {
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
}
aphost, apport, _ := net.SplitHostPort(cfg.APUrls[0].Host)
if apport != lpport {
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
}
if aphost != defaultHostname {
t.Fatalf("advertise peer url expected machine default host %q, got %q", defaultHostname, aphost)
}
expected := fmt.Sprintf("%s=%s://%s:%s", cfg.Name, oldscheme, defaultHostname, lpport)
if expected != cfg.InitialCluster {
t.Fatalf("initial-cluster expected %q, got %q", expected, cfg.InitialCluster)
}
// advertise client URL should not be affected
if origadvc != cfg.ACUrls[0].String() {
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
}
}
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
return s.CAFile == t.CAFile &&
s.CertFile == t.CertFile &&

View File

@ -464,7 +464,12 @@ Prints the member ID of the new member and the cluster ID.
```bash
./etcdctl member add newMember --peer-urls=https://127.0.0.1:12345
# Member 2be1eb8f84b7f63e added to cluster ef37ad9dc622a7c4
Member ced000fda4d05edf added to cluster 8c4281cc65c7b112
ETCD_NAME="newMember"
ETCD_INITIAL_CLUSTER="newMember=https://127.0.0.1:12345,default=http://10.0.0.30:2380"
ETCD_INITIAL_CLUSTER_STATE="existing"
```
### MEMBER UPDATE \<memberID\> [options]

View File

@ -125,18 +125,19 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
return rpctypes.ErrCompacted
}
var rev int64
var lastRev int64
ops := []clientv3.Op{}
for _, ev := range wr.Events {
nrev := ev.Kv.ModRevision
if rev != 0 && nrev > rev {
nextRev := ev.Kv.ModRevision
if lastRev != 0 && nextRev > lastRev {
_, err := dc.Txn(ctx).Then(ops...).Commit()
if err != nil {
return err
}
ops = []clientv3.Op{}
}
lastRev = nextRev
switch ev.Type {
case mvccpb.PUT:
ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))

View File

@ -99,6 +99,7 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("member name not provided."))
}
newMemberName := args[0]
if len(memberPeerURLs) == 0 {
ExitWithError(ExitBadArgs, fmt.Errorf("member peer urls not provided."))
@ -106,13 +107,53 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
urls := strings.Split(memberPeerURLs, ",")
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberAdd(ctx, urls)
cli := mustClientFromCmd(cmd)
resp, err := cli.MemberAdd(ctx, urls)
cancel()
if err != nil {
ExitWithError(ExitError, err)
}
newID := resp.Member.ID
display.MemberAdd(*resp)
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
break
}
// quorum get to sync cluster list
gresp, gerr := cli.Get(ctx, "_")
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
listResp, err = cli.MemberList(ctx)
}
cancel()
conf := []string{}
for _, memb := range listResp.Members {
for _, u := range memb.PeerURLs {
n := memb.Name
if memb.ID == newID {
n = newMemberName
}
conf = append(conf, fmt.Sprintf("%s=%s", n, u))
}
}
fmt.Print("\n")
fmt.Printf("ETCD_NAME=%q\n", newMemberName)
fmt.Printf("ETCD_INITIAL_CLUSTER=%q\n", strings.Join(conf, ","))
fmt.Printf("ETCD_INITIAL_CLUSTER_STATE=\"existing\"\n")
}
}
// memberRemoveCommandFunc executes the "member remove" command.

View File

@ -21,6 +21,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path"
"reflect"
@ -30,6 +31,7 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/fileutil"
@ -371,7 +373,10 @@ func makeDB(snapdir, dbfile string, commit int) {
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {

View File

@ -45,7 +45,7 @@ var (
func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, proto, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")

View File

@ -39,8 +39,6 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy/httpproxy"
"github.com/coreos/etcd/version"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
"github.com/coreos/pkg/capnslog"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
@ -85,7 +83,13 @@ func startEtcdOrProxyV2() {
GoMaxProcs := runtime.GOMAXPROCS(0)
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
(&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
defaultHost, dhErr := (&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" {
plog.Infof("advertising using detected default host %q", defaultHost)
}
if dhErr != nil {
plog.Noticef("failed to detect default host (%v)", dhErr)
}
if cfg.Dir == "" {
cfg.Dir = fmt.Sprintf("%v.etcd", cfg.Name)
@ -157,20 +161,12 @@ func startEtcdOrProxyV2() {
osutil.HandleInterrupts()
if systemdutil.IsRunningSystemd() {
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
notifySystemd()
select {
case lerr := <-errc:
@ -184,15 +180,6 @@ func startEtcdOrProxyV2() {
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
defaultHost, dhErr := cfg.IsDefaultHost()
if defaultHost != "" {
if dhErr == nil {
plog.Infof("advertising using detected default host %q", defaultHost)
} else {
plog.Noticef("failed to detect default host, advertise falling back to %q (%v)", defaultHost, dhErr)
}
}
if cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
}
@ -202,7 +189,10 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Server.Stop)
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}

View File

@ -17,12 +17,14 @@ package etcdmain
import (
"fmt"
"net"
"net/url"
"os"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/proxy/tcpproxy"
"github.com/spf13/cobra"
)
@ -77,6 +79,20 @@ func newGatewayStartCommand() *cobra.Command {
return &cmd
}
func stripSchema(eps []string) []string {
var endpoints []string
for _, ep := range eps {
if u, err := url.Parse(ep); err == nil && u.Host != "" {
ep = u.Host
}
endpoints = append(endpoints, ep)
}
return endpoints
}
func startGateway(cmd *cobra.Command, args []string) {
endpoints := gatewayEndpoints
if gatewayDNSCluster != "" {
@ -101,6 +117,9 @@ func startGateway(cmd *cobra.Command, args []string) {
}
}
// Strip the schema from the endpoints because we start just a TCP proxy
endpoints = stripSchema(endpoints)
if len(endpoints) == 0 {
plog.Fatalf("no endpoints found")
}
@ -117,5 +136,8 @@ func startGateway(cmd *cobra.Command, args []string) {
MonitorInterval: getewayRetryDelay,
}
// At this point, etcd gateway listener is initialized
notifySystemd()
tp.Run()
}

View File

@ -103,7 +103,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
os.Exit(1)
}
kvp := grpcproxy.NewKvProxy(client)
kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
clusterp := grpcproxy.NewClusterProxy(client)
leasep := grpcproxy.NewLeaseProxy(client)
@ -144,6 +144,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
go func() { errc <- m.Serve() }()
// grpc-proxy is initialized, ready to serve
notifySystemd()
fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
}

View File

@ -17,6 +17,9 @@ package etcdmain
import (
"fmt"
"os"
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
)
func Main() {
@ -35,3 +38,16 @@ func Main() {
startEtcdOrProxyV2()
}
func notifySystemd() {
if !systemdutil.IsRunningSystemd() {
return
}
sent, err := daemon.SdNotify(false, "READY=1")
if err != nil {
plog.Errorf("failed to notify systemd for readiness: %v", err)
}
if !sent {
plog.Errorf("forgot to set Type=notify in systemd service file?")
}
}

View File

@ -18,6 +18,7 @@ import (
"crypto/sha256"
"io"
"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
@ -45,6 +46,10 @@ type RaftStatusGetter interface {
Leader() types.ID
}
type AuthGetter interface {
AuthStore() auth.AuthStore
}
type maintenanceServer struct {
rg RaftStatusGetter
kg KVGetter
@ -54,7 +59,8 @@ type maintenanceServer struct {
}
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
return &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
return &authMaintenanceServer{srv, s}
}
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
@ -139,3 +145,49 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
ms.hdr.fill(resp.Header)
return resp, nil
}
type authMaintenanceServer struct {
*maintenanceServer
ag AuthGetter
}
func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
authInfo, err := ams.ag.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
return ams.ag.AuthStore().IsAdminPermitted(authInfo)
}
func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Defragment(ctx, sr)
}
func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
if err := ams.isAuthenticated(srv.Context()); err != nil {
return err
}
return ams.maintenanceServer.Snapshot(sr, srv)
}
func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Hash(ctx, r)
}
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.Status(ctx, ar)
}

View File

@ -93,7 +93,7 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCPermissionNotGranted
case auth.ErrAuthNotEnabled:
return rpctypes.ErrGRPCAuthNotEnabled
case etcdserver.ErrInvalidAuthToken:
case auth.ErrInvalidAuthToken:
return rpctypes.ErrGRPCInvalidAuthToken
default:
return grpc.Errorf(codes.Unknown, err.Error())

View File

@ -31,7 +31,6 @@ var (
ErrNoLeader = errors.New("etcdserver: no leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
)

View File

@ -459,7 +459,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
srv.authStore = auth.NewAuthStore(srv.be)
srv.authStore = auth.NewAuthStore(srv.be,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
})
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
srv.compactor.Run()
@ -591,6 +594,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
}
@ -663,6 +667,7 @@ func (s *EtcdServer) run() {
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedt: snap.Metadata.Term,
appliedi: snap.Metadata.Index,
}
@ -762,7 +767,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
@ -864,6 +869,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
plog.Info("finished adding peers from new cluster configuration into network...")
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
@ -885,7 +891,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
return
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@ -1019,7 +1025,7 @@ func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) err
// in the state machine layer
// However, both of membership change and role management requires the root privilege.
// So careful operation by admins can prevent the problem.
authInfo, err := s.authInfoFromCtx(ctx)
authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
@ -1239,9 +1245,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
// apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer.
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
@ -1251,16 +1255,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
shouldstop = shouldstop || removedSelf
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, err)
default:
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index
appliedt = e.Term
appliedi = e.Index
}
return applied, shouldstop
return appliedt, appliedi, shouldStop
}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

View File

@ -613,7 +613,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"io"
"log"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
@ -26,12 +25,7 @@ import (
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
// as ReadCloser.
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
snapt, err := s.r.raftStorage.Term(snapi)
if err != nil {
log.Panicf("get term should never fail: %v", err)
}
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
// get a snapshot of v2 store as []byte
clone := s.store.Clone()
d, err := clone.SaveNoCopy()

View File

@ -17,8 +17,6 @@ package etcdserver
import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/auth"
@ -31,7 +29,6 @@ import (
"github.com/coreos/go-semver/semver"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
@ -617,52 +614,10 @@ func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest
return result.resp.(*pb.AuthRoleDeleteResponse), nil
}
func (s *EtcdServer) isValidSimpleToken(token string) bool {
splitted := strings.Split(token, ".")
if len(splitted) != 2 {
return false
}
index, err := strconv.Atoi(splitted[1])
if err != nil {
return false
}
select {
case <-s.applyWait.Wait(uint64(index)):
return true
case <-s.stop:
return true
}
}
func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
if !ok {
return nil, nil
}
ts, tok := md["token"]
if !tok {
return nil, nil
}
token := ts[0]
if !s.isValidSimpleToken(token) {
return nil, ErrInvalidAuthToken
}
authInfo, uok := s.AuthStore().AuthInfoFromToken(token)
if !uok {
plog.Warningf("invalid auth token: %s", token)
return nil, ErrInvalidAuthToken
}
return authInfo, nil
}
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
for {
ai, err := s.authInfoFromCtx(ctx)
ai, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}
@ -697,7 +652,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
ID: s.reqIDGen.Next(),
}
authInfo, err := s.authInfoFromCtx(ctx)
authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return nil, err
}
@ -759,7 +714,6 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
internalTimeout := time.Second
for {
ctx := make([]byte, 8)
@ -778,7 +732,7 @@ func (s *EtcdServer) linearizableReadLoop() {
s.readNotifier = nextnr
s.readMu.Unlock()
cctx, cancel := context.WithTimeout(context.Background(), internalTimeout)
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctx); err != nil {
cancel()
if err == raft.ErrStopped {
@ -803,7 +757,7 @@ func (s *EtcdServer) linearizableReadLoop() {
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
}
case <-time.After(internalTimeout):
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
nr.notify(ErrTimeout)
timeout = true

8
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 1318c1dff08a83ee9e334c94d23c165fbef5508a96ef8782ca0529e332521538
updated: 2017-01-11T16:34:17.552765434-08:00
hash: ca3c895fa60c9ca9f53408202fb7643705f9960212d342967ed0da8e93606cc4
updated: 2017-01-18T10:26:48.990115455-08:00
imports:
- name: github.com/beorn7/perks
version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
@ -111,12 +111,14 @@ imports:
- bcrypt
- blowfish
- name: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172
version: f2499483f923065a842d38eb4c7f1927e6fc6e6d
subpackages:
- context
- http2
- http2/hpack
- idna
- internal/timeseries
- lex/httplex
- trace
- name: golang.org/x/sys
version: 478fcf54317e52ab69f40bb4c7a1520288d7f7ea

View File

@ -71,7 +71,7 @@ import:
- bcrypt
- blowfish
- package: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172
version: f2499483f923065a842d38eb4c7f1927e6fc6e6d
subpackages:
- context
- http2

View File

@ -449,6 +449,8 @@ type member struct {
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
keepDataDirTerminate bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@ -746,8 +748,10 @@ func (m *member) Restart(t *testing.T) error {
func (m *member) Terminate(t *testing.T) {
plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr)
m.Close()
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
if !m.keepDataDirTerminate {
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
t.Fatal(err)
}
}
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

View File

@ -30,8 +30,9 @@ var (
)
type grpcClientProxy struct {
grpc grpcAPI
wdonec <-chan struct{}
grpc grpcAPI
wdonec <-chan struct{}
kvdonec <-chan struct{}
}
func toGRPC(c *clientv3.Client) grpcAPI {
@ -43,26 +44,30 @@ func toGRPC(c *clientv3.Client) grpcAPI {
}
wp, wpch := grpcproxy.NewWatchProxy(c)
kvp, kvpch := grpcproxy.NewKvProxy(c)
grpc := grpcAPI{
pb.NewClusterClient(c.ActiveConnection()),
grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
grpcproxy.KvServerToKvClient(kvp),
pb.NewLeaseClient(c.ActiveConnection()),
grpcproxy.WatchServerToWatchClient(wp),
pb.NewMaintenanceClient(c.ActiveConnection()),
pb.NewAuthClient(c.ActiveConnection()),
}
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
return grpc
}
type watchCloser struct {
type proxyCloser struct {
clientv3.Watcher
wdonec <-chan struct{}
wdonec <-chan struct{}
kvdonec <-chan struct{}
}
func (wc *watchCloser) Close() error {
err := wc.Watcher.Close()
<-wc.wdonec
func (pc *proxyCloser) Close() error {
// client ctx is canceled before calling close, so kv will close out
<-pc.kvdonec
err := pc.Watcher.Close()
<-pc.wdonec
return err
}
@ -74,9 +79,10 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
pmu.Lock()
c.Watcher = &watchCloser{
c.Watcher = &proxyCloser{
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
wdonec: proxies[c].wdonec,
kvdonec: proxies[c].kvdonec,
}
pmu.Unlock()
return c, nil

View File

@ -27,6 +27,7 @@ import (
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
)
@ -441,6 +442,51 @@ func TestRejectUnhealthyRemove(t *testing.T) {
}
}
// TestRestartRemoved ensures that restarting removed member must exit
// if 'initial-cluster-state' is set 'new' and old data directory still exists
// (see https://github.com/coreos/etcd/issues/7512 for more).
func TestRestartRemoved(t *testing.T) {
defer testutil.AfterTest(t)
capnslog.SetGlobalLogLevel(capnslog.INFO)
// 1. start single-member cluster
c := NewCluster(t, 1)
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
c.Launch(t)
defer c.Terminate(t)
// 2. add a new member
c.AddMember(t)
c.WaitLeader(t)
oldm := c.Members[0]
oldm.keepDataDirTerminate = true
// 3. remove first member, shut down without deleting data
if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
t.Fatalf("expected to remove member, got error %v", err)
}
c.WaitLeader(t)
// 4. restart first member with 'initial-cluster-state=new'
// wrong config, expects exit within ReqTimeout
oldm.ServerConfig.NewCluster = false
if err := oldm.Restart(t); err != nil {
t.Fatalf("unexpected ForceRestart error: %v", err)
}
defer func() {
oldm.Close()
os.RemoveAll(oldm.ServerConfig.DataDir)
}()
select {
case <-oldm.s.StopNotify():
case <-time.After(time.Minute):
t.Fatalf("removed member didn't exit within %v", time.Minute)
}
}
// clusterMustProgress ensures that cluster can make progress. It creates
// a random key first, and check the new key could be got from all client urls
// of the cluster.

View File

@ -581,6 +581,37 @@ func TestV3Hash(t *testing.T) {
}
}
// TestV3HashRestart ensures that hash stays the same after restart.
func TestV3HashRestart(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash1 := resp.Hash
clus.Members[0].Stop(t)
clus.Members[0].Restart(t)
clus.waitLeader(t, clus.Members)
kvc := toGRPC(clus.Client(0)).KV
waitForRestart(t, kvc)
cli = clus.RandClient()
resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
hash2 := resp.Hash
if hash1 != hash2 {
t.Fatalf("hash expected %d, got %d", hash1, hash2)
}
}
// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
func TestV3StorageQuotaAPI(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -197,3 +197,50 @@ func TestSTMSerialize(t *testing.T) {
}
}
}
// TestSTMApplyOnConcurrentDeletion ensures that concurrent key deletion
// fails the first GET revision comparison within STM; trigger retry.
func TestSTMApplyOnConcurrentDeletion(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
etcdc := clus.RandClient()
if _, err := etcdc.Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
donec, readyc := make(chan struct{}), make(chan struct{})
go func() {
<-readyc
if _, err := etcdc.Delete(context.TODO(), "foo"); err != nil {
t.Fatal(err)
}
close(donec)
}()
try := 0
applyf := func(stm concurrency.STM) error {
try++
stm.Get("foo")
if try == 1 {
// trigger delete to make GET rev comparison outdated
close(readyc)
<-donec
}
stm.Put("foo2", "bar2")
return nil
}
if _, err := concurrency.NewSTMRepeatable(context.TODO(), etcdc, applyf); err != nil {
t.Fatalf("error on stm txn (%v)", err)
}
if try != 2 {
t.Fatalf("STM apply expected to run twice, got %d", try)
}
resp, err := etcdc.Get(context.TODO(), "foo2")
if err != nil {
t.Fatalf("error fetching key (%v)", err)
}
if string(resp.Kvs[0].Value) != "bar2" {
t.Fatalf("bad value. got %+v, expected 'bar2' value", resp)
}
}

View File

@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// sort keys so deletes are in same order among all members,
// otherwise the backened hashes will be different
keys := make([]string, 0, len(l.itemSet))
for item := range l.itemSet {
keys = append(keys, item.Key)
}
keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}
@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
return ErrLeaseNotFound
}
l.mu.Lock()
for _, it := range items {
delete(l.itemSet, it)
delete(le.itemMap, it)
}
l.mu.Unlock()
return nil
}
@ -506,6 +507,8 @@ type Lease struct {
// expiry is time when lease should expire; must be 64-bit aligned.
expiry monotime.Time
// mu protects concurrent accesses to itemSet
mu sync.RWMutex
itemSet map[LeaseItem]struct{}
revokec chan struct{}
}
@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
keys := make([]string, 0, len(l.itemSet))
for k := range l.itemSet {
keys = append(keys, k.Key)
}
l.mu.RUnlock()
return keys
}

View File

@ -15,11 +15,13 @@
package lease
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"sort"
"sync"
"testing"
"time"
@ -76,6 +78,53 @@ func TestLessorGrant(t *testing.T) {
be.BatchTx().Unlock()
}
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
// from concurrent map writes on 'itemSet'.
func TestLeaseConcurrentKeys(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
fd := &fakeDeleter{}
le := newLessor(be, minLeaseTTL)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}
itemn := 10
items := make([]LeaseItem, itemn)
for i := 0; i < itemn; i++ {
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
}
if err = le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}
donec := make(chan struct{})
go func() {
le.Detach(l.ID, items)
close(donec)
}()
var wg sync.WaitGroup
wg.Add(itemn)
for i := 0; i < itemn; i++ {
go func() {
defer wg.Done()
l.Keys()
}()
}
<-donec
wg.Wait()
}
// TestLessorRevoke ensures Lessor can revoke a lease.
// The items in the revoked lease should be removed from
// the backend.

View File

@ -303,6 +303,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
@ -319,6 +320,8 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
count = 0
}
return tmpb.Put(k, v)

16
pkg/cpuutil/doc.go Normal file
View File

@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cpuutil provides facilities for detecting cpu-specific features.
package cpuutil

36
pkg/cpuutil/endian.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cpuutil
import (
"encoding/binary"
"unsafe"
)
const intWidth int = int(unsafe.Sizeof(0))
var byteOrder binary.ByteOrder
// ByteOrder returns the byte order for the CPU's native endianness.
func ByteOrder() binary.ByteOrder { return byteOrder }
func init() {
var i int = 0x1
if v := (*[intWidth]byte)(unsafe.Pointer(&i)); v[0] == 0 {
byteOrder = binary.BigEndian
} else {
byteOrder = binary.LittleEndian
}
}

View File

@ -121,7 +121,7 @@ func verifyEnv(prefix string, usedEnvKey, alreadySet map[string]bool) {
plog.Infof("recognized environment variable %s, but unused: shadowed by corresponding flag ", kv[0])
continue
}
if strings.HasPrefix(env, prefix) {
if strings.HasPrefix(env, prefix+"_") {
plog.Warningf("unrecognized environment variable %s", env)
}
}

View File

@ -43,7 +43,7 @@ func RecoverPort(port int) error {
// SetLatency adds latency in millisecond scale with random variations.
func SetLatency(ms, rv int) error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
@ -51,14 +51,16 @@ func SetLatency(ms, rv int) error {
if rv > ms {
rv = 1
}
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
for ifce := range ifces {
cmdStr := fmt.Sprintf("sudo tc qdisc add dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
// the rule has already been added. Overwrite it.
cmdStr = fmt.Sprintf("sudo tc qdisc change dev %s root netem delay %dms %dms distribution normal", ifce, ms, rv)
_, err = exec.Command("/bin/sh", "-c", cmdStr).Output()
if err != nil {
return err
}
}
}
return nil
@ -66,10 +68,15 @@ func SetLatency(ms, rv int) error {
// RemoveLatency resets latency configurations.
func RemoveLatency() error {
ifce, err := GetDefaultInterface()
ifces, err := GetDefaultInterfaces()
if err != nil {
return err
}
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
return err
for ifce := range ifces {
_, err = exec.Command("/bin/sh", "-c", fmt.Sprintf("sudo tc qdisc del dev %s root netem", ifce)).Output()
if err != nil {
return err
}
}
return nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !linux !386,!amd64
// +build !linux
package netutil
@ -27,7 +27,7 @@ func GetDefaultHost() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}
// GetDefaultInterface fetches the device name of default routable interface.
func GetDefaultInterface() (string, error) {
return "", fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
// GetDefaultInterfaces fetches the device name of default routable interface.
func GetDefaultInterfaces() (map[string]uint8, error) {
return nil, fmt.Errorf("default host not supported on %s_%s", runtime.GOOS, runtime.GOARCH)
}

View File

@ -13,9 +13,6 @@
// limitations under the License.
// +build linux
// +build 386 amd64
// TODO support native endian but without using "unsafe"
package netutil
@ -24,27 +21,57 @@ import (
"encoding/binary"
"fmt"
"net"
"sort"
"syscall"
"github.com/coreos/etcd/pkg/cpuutil"
)
var errNoDefaultRoute = fmt.Errorf("could not find default route")
var errNoDefaultHost = fmt.Errorf("could not find default host")
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
// GetDefaultHost obtains the first IP address of machine from the routing table and returns the IP address as string.
// An IPv4 address is preferred to an IPv6 address for backward compatibility.
func GetDefaultHost() (string, error) {
rmsg, rerr := getDefaultRoute()
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return "", rerr
}
host, oif, err := parsePREFSRC(rmsg)
if err != nil {
return "", err
// prioritize IPv4
if rmsg, ok := rmsgs[syscall.AF_INET]; ok {
if host, err := chooseHost(syscall.AF_INET, rmsg); host != "" || err != nil {
return host, err
}
delete(rmsgs, syscall.AF_INET)
}
if host != "" {
return host, nil
// sort so choice is deterministic
var families []int
for family := range rmsgs {
families = append(families, int(family))
}
sort.Ints(families)
for _, f := range families {
family := uint8(f)
if host, err := chooseHost(family, rmsgs[family]); host != "" || err != nil {
return host, err
}
}
return "", errNoDefaultHost
}
func chooseHost(family uint8, rmsg *syscall.NetlinkMessage) (string, error) {
host, oif, err := parsePREFSRC(rmsg)
if host != "" || err != nil {
return host, err
}
// prefsrc not detected, fall back to getting address from iface
ifmsg, ierr := getIface(oif)
ifmsg, ierr := getIfaceAddr(oif, family)
if ierr != nil {
return "", ierr
}
@ -55,15 +82,16 @@ func GetDefaultHost() (string, error) {
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.RTA_SRC {
// search for RTA_DST because ipv6 doesn't have RTA_SRC
if attr.Attr.Type == syscall.RTA_DST {
return net.IP(attr.Value).String(), nil
}
}
return "", errNoDefaultRoute
return "", nil
}
func getDefaultRoute() (*syscall.NetlinkMessage, error) {
func getDefaultRoutes() (map[uint8]*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETROUTE, syscall.AF_UNSPEC)
if err != nil {
return nil, err
@ -74,26 +102,33 @@ func getDefaultRoute() (*syscall.NetlinkMessage, error) {
return nil, msgErr
}
routes := make(map[uint8]*syscall.NetlinkMessage)
rtmsg := syscall.RtMsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWROUTE {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofRtMsg])
if rerr := binary.Read(buf, binary.LittleEndian, &rtmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &rtmsg); rerr != nil {
continue
}
if rtmsg.Dst_len == 0 {
if rtmsg.Dst_len == 0 && rtmsg.Table == syscall.RT_TABLE_MAIN {
// zero-length Dst_len implies default route
return &m, nil
msg := m
routes[rtmsg.Family] = &msg
}
}
if len(routes) > 0 {
return routes, nil
}
return nil, errNoDefaultRoute
}
func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, syscall.AF_UNSPEC)
// Used to get an address of interface.
func getIfaceAddr(idx uint32, family uint8) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETADDR, int(family))
if err != nil {
return nil, err
}
@ -109,7 +144,7 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfAddrmsg])
if rerr := binary.Read(buf, binary.LittleEndian, &ifaddrmsg); rerr != nil {
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifaddrmsg); rerr != nil {
continue
}
if ifaddrmsg.Index == idx {
@ -117,38 +152,75 @@ func getIface(idx uint32) (*syscall.NetlinkMessage, error) {
}
}
return nil, errNoDefaultRoute
return nil, fmt.Errorf("could not find address for interface index %v", idx)
}
var errNoDefaultInterface = fmt.Errorf("could not find default interface")
func GetDefaultInterface() (string, error) {
rmsg, rerr := getDefaultRoute()
if rerr != nil {
return "", rerr
}
_, oif, err := parsePREFSRC(rmsg)
// Used to get a name of interface.
func getIfaceLink(idx uint32) (*syscall.NetlinkMessage, error) {
dat, err := syscall.NetlinkRIB(syscall.RTM_GETLINK, syscall.AF_UNSPEC)
if err != nil {
return "", err
return nil, err
}
ifmsg, ierr := getIface(oif)
if ierr != nil {
return "", ierr
msgs, msgErr := syscall.ParseNetlinkMessage(dat)
if msgErr != nil {
return nil, msgErr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return "", aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
return string(attr.Value[:len(attr.Value)-1]), nil
ifinfomsg := syscall.IfInfomsg{}
for _, m := range msgs {
if m.Header.Type != syscall.RTM_NEWLINK {
continue
}
buf := bytes.NewBuffer(m.Data[:syscall.SizeofIfInfomsg])
if rerr := binary.Read(buf, cpuutil.ByteOrder(), &ifinfomsg); rerr != nil {
continue
}
if ifinfomsg.Index == int32(idx) {
return &m, nil
}
}
return "", errNoDefaultInterface
return nil, fmt.Errorf("could not find link for interface index %v", idx)
}
// GetDefaultInterfaces gets names of interfaces and returns a map[interface]families.
func GetDefaultInterfaces() (map[string]uint8, error) {
interfaces := make(map[string]uint8)
rmsgs, rerr := getDefaultRoutes()
if rerr != nil {
return interfaces, rerr
}
for family, rmsg := range rmsgs {
_, oif, err := parsePREFSRC(rmsg)
if err != nil {
return interfaces, err
}
ifmsg, ierr := getIfaceLink(oif)
if ierr != nil {
return interfaces, ierr
}
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
if aerr != nil {
return interfaces, aerr
}
for _, attr := range attrs {
if attr.Attr.Type == syscall.IFLA_IFNAME {
// key is an interface name
// possible values: 2 - AF_INET, 10 - AF_INET6, 12 - dualstack
interfaces[string(attr.Value[:len(attr.Value)-1])] += family
}
}
}
if len(interfaces) > 0 {
return interfaces, nil
}
return interfaces, errNoDefaultInterface
}
// parsePREFSRC returns preferred source address and output interface index (RTA_OIF).
@ -164,7 +236,7 @@ func parsePREFSRC(m *syscall.NetlinkMessage) (host string, oif uint32, err error
host = net.IP(attr.Value).String()
}
if attr.Attr.Type == syscall.RTA_OIF {
oif = binary.LittleEndian.Uint32(attr.Value)
oif = cpuutil.ByteOrder().Uint32(attr.Value)
}
if host != "" && oif != uint32(0) {
break

View File

@ -19,9 +19,17 @@ package netutil
import "testing"
func TestGetDefaultInterface(t *testing.T) {
ifc, err := GetDefaultInterface()
ifc, err := GetDefaultInterfaces()
if err != nil {
t.Fatal(err)
}
t.Logf("default network interface: %q\n", ifc)
t.Logf("default network interfaces: %+v\n", ifc)
}
func TestGetDefaultHost(t *testing.T) {
ip, err := GetDefaultHost()
if err != nil {
t.Fatal(err)
}
t.Logf("default ip: %v", ip)
}

View File

@ -112,6 +112,10 @@ func (r *report) Stats() <-chan Stats {
go func() {
defer close(donec)
r.processResults()
var ts TimeSeries
if r.sps != nil {
ts = r.sps.getTimeSeries()
}
donec <- Stats{
AvgTotal: r.avgTotal,
Fastest: r.fastest,
@ -122,7 +126,7 @@ func (r *report) Stats() <-chan Stats {
Total: r.total,
ErrorDist: copyMap(r.errorDist),
Lats: copyFloats(r.lats),
TimeSeries: r.sps.getTimeSeries(),
TimeSeries: ts,
}
}()
return donec

View File

@ -39,6 +39,7 @@ type Cache interface {
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
Compact(revision int64)
Invalidate(key []byte, endkey []byte)
Close()
}
// keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
@ -58,6 +59,8 @@ func NewCache(maxCacheEntries int) Cache {
}
}
func (c *cache) Close() { c.lru.Stop() }
// cache implements Cache
type cache struct {
mu sync.RWMutex

View File

@ -27,11 +27,18 @@ type kvProxy struct {
cache cache.Cache
}
func NewKvProxy(c *clientv3.Client) pb.KVServer {
return &kvProxy{
func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
kv := &kvProxy{
kv: c.KV,
cache: cache.NewCache(cache.DefaultMaxEntries),
}
donec := make(chan struct{})
go func() {
defer close(donec)
<-c.Ctx().Done()
kv.cache.Close()
}()
return kv, donec
}
func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {

View File

@ -76,7 +76,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
t.Fatal(err)
}
kvp := NewKvProxy(client)
kvp, _ := NewKvProxy(client)
kvts := &kvproxyTestServer{
kp: kvp,

View File

@ -147,16 +147,17 @@ func (tp *TCPProxy) runMonitor() {
select {
case <-time.After(tp.MonitorInterval):
tp.mu.Lock()
for _, r := range tp.remotes {
if !r.isActive() {
go func() {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}()
for _, rem := range tp.remotes {
if rem.isActive() {
continue
}
go func(r *remote) {
if err := r.tryReactivate(); err != nil {
plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
} else {
plog.Printf("activated %s", r.addr)
}
}(rem)
}
tp.mu.Unlock()
case <-tp.donec:

View File

@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) {
return
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.

View File

@ -1856,6 +1856,77 @@ func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
}
}
// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
// when it commits at least one log entry at it term.
func TestReadOnlyForNewLeader(t *testing.T) {
cfg := newTestConfig(1, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 1, Term: 1},
})
cfg.Applied = 1
a := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
b := newRaft(cfg)
cfg = newTestConfig(2, []uint64{1, 2, 3}, 10, 1,
&MemoryStorage{
ents: []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}},
hardState: pb.HardState{Commit: 2, Term: 1},
})
cfg.Applied = 2
c := newRaft(cfg)
nt := newNetwork(a, b, c)
// Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader.
nt.ignore(pb.MsgApp)
// Force peer a to become leader.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Fatalf("state = %s, want %s", a.state, StateLeader)
}
// Ensure peer a drops read only request.
var windex uint64 = 4
wctx := []byte("ctx")
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 0 {
t.Fatalf("len(readStates) = %d, want zero", len(a.readStates))
}
nt.recover()
// Force peer a to commit a log entry at its term
for i := 0; i < a.heartbeatTimeout; i++ {
a.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if a.raftLog.committed != 4 {
t.Fatalf("committed = %d, want 4", a.raftLog.committed)
}
lastLogTerm := a.raftLog.zeroTermOnErrCompacted(a.raftLog.term(a.raftLog.committed))
if lastLogTerm != a.Term {
t.Fatalf("last log term = %d, want %d", lastLogTerm, a.Term)
}
// Ensure peer a accepts read only request after it commits a entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(a.readStates) != 1 {
t.Fatalf("len(readStates) = %d, want 1", len(a.readStates))
}
rs := a.readStates[0]
if rs.Index != windex {
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
}
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}
}
func TestLeaderAppResp(t *testing.T) {
// initial progress: match = 0; next = 3
tests := []struct {

2
test
View File

@ -293,7 +293,7 @@ function build_pass {
}
for pass in $PASSES; do
${pass}_pass
${pass}_pass $@
done
echo "Success"

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.1.0-rc.1+git"
Version = "3.1.4"
APIVersion = "unknown"
// Git SHA Value will be set during build