Compare commits

...

12 Commits

Author SHA1 Message Date
c31bec0f29 version: bump up to 3.2.4
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-19 08:37:30 -07:00
19fe4b0cac grpcproxy: return nil on receiving snapshot EOF
Gets "code = OutOfRange desc = EOF" errors otherwise.
2017-07-19 08:33:44 -07:00
a5d94fe229 integration: test embed.Etcd.Close with watch
Ensure 'Close' returns in time when there are open
connections (watch streams).

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-14 18:52:20 -07:00
e8f3cbf1c6 embed: wait up to request timeout for pending RPCs when closing
Both grpc.Server.Stop and grpc.Server.GracefulStop close the listeners
first, to stop accepting the new connections. GracefulStop blocks until
all clients close their open transports(connections). Unary RPCs
only take a few seconds to finish. Stream RPCs, like watch, might never
close the connections from client side, thus making gRPC server wait
forever.

This patch still calls GracefulStop, but waits up to 10s before manually
closing the open transports.

Address https://github.com/coreos/etcd/issues/8224.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-14 18:52:20 -07:00
856502f788 version: bump up to 3.2.3+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-14 16:04:54 -07:00
ae23b0ef2f version: bump up to 3.2.3
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-13 12:09:48 -07:00
5ee89be616 testutil: whitelist WaitGroup.Done
Calling a WaitGroup.Done() in a defer will sometimes trigger the leak
detector since the WaitGroup.Wait() will unblock before the defer
block completes. If the leak detector runs before the Done() is
rescheduled, it will spuriously report the finishing Done() as a leak.
This happens enough in CI to be irritating; whitelist it and ignore.
2017-07-13 11:14:12 -07:00
38373b342d test: sync with etcd-agent start in functional_pass
Fix https://github.com/coreos/etcd/issues/8211.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-13 11:14:03 -07:00
536a5f594b v3rpc: Let clients establish unlimited streams
From go-grpc v1.2.0, the number of max streams per client is set to 100
by default by the server side. This change makes it impossible
for third party proxies and custom clients to establish many streams.
2017-07-12 10:46:33 -07:00
49e6916e66 dev-guide: document using range_end for prefixes with json
Lack of a range_end example has caused some confusion.
2017-07-12 10:40:37 -07:00
b9b6f6f7c4 Documentation: refer to LeaseKeepAliveRequest for lease refresh 2017-07-12 10:40:26 -07:00
6ecbb3bbc5 version: bump up to 3.2.2+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-07-12 10:36:16 -07:00
9 changed files with 92 additions and 7 deletions

View File

@ -24,6 +24,11 @@ curl -L http://localhost:2379/v3alpha/kv/put \
curl -L http://localhost:2379/v3alpha/kv/range \
-X POST -d '{"key": "Zm9v"}'
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
# get all keys prefixed with "foo"
curl -L http://localhost:2379/v3alpha/kv/range \
-X POST -d '{"key": "Zm9v", "range_end": "Zm9w"}'
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
```
Use `curl` to watch a key:

View File

@ -449,7 +449,7 @@ message LeaseRevokeRequest {
### Keep alives
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseGrantRequest` over the stream:
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseKeepAliveRequest` over the stream:
```protobuf
message LeaseKeepAliveRequest {

View File

@ -186,11 +186,29 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })
// (gRPC server) stops accepting new connections,
// RPCs, and blocks until all pending RPCs are finished
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
gs.GracefulStop()
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}
}

View File

@ -16,6 +16,7 @@ package v3rpc
import (
"crypto/tls"
"math"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -24,6 +25,8 @@ import (
"google.golang.org/grpc/grpclog"
)
const maxStreams = math.MaxUint32
func init() {
grpclog.SetLogger(plog)
}
@ -36,8 +39,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
grpcServer := grpc.NewServer(opts...)
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))

View File

@ -15,13 +15,16 @@
package integration
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
)
@ -102,6 +105,47 @@ func TestEmbedEtcd(t *testing.T) {
}
}
// TestEmbedEtcdGracefulStop ensures embedded server stops
// cutting existing transports.
func TestEmbedEtcdGracefulStop(t *testing.T) {
cfg := embed.NewConfig()
urls := newEmbedURLs(2)
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
os.RemoveAll(cfg.Dir)
defer os.RemoveAll(cfg.Dir)
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{urls[0].String()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
// open watch connection
cli.Watch(context.Background(), "foo")
donec := make(chan struct{})
go func() {
e.Close()
close(donec)
}()
select {
case err := <-e.Err():
t.Fatal(err)
case <-donec:
case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()):
t.Fatalf("took too long to close server")
}
}
func newEmbedURLs(n int) (urls []url.URL) {
for i := 0; i < n; i++ {
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))

View File

@ -118,6 +118,7 @@ func interestingGoroutines() (gs []string) {
}
stack := strings.TrimSpace(sl[1])
if stack == "" ||
strings.Contains(stack, "sync.(*WaitGroup).Done") ||
strings.Contains(stack, "created by os/signal.init") ||
strings.Contains(stack, "runtime/panic.go") ||
strings.Contains(stack, "created by testing.RunTests") ||

View File

@ -15,6 +15,8 @@
package grpcproxy
import (
"io"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan
for {
rr, err := sc.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
err = stream.Send(rr)

10
test
View File

@ -100,13 +100,21 @@ function functional_pass {
agent_pids="${agent_pids} $pid"
done
for a in 1 2 3; do
echo "Waiting for 'etcd-agent' on ${a}9027..."
while ! nc -z localhost ${a}9027; do
sleep 1
done
done
echo "Starting 'etcd-tester'"
./bin/etcd-tester \
-agent-endpoints "127.0.0.1:19027,127.0.0.1:29027,127.0.0.1:39027" \
-client-ports 12379,22379,32379 \
-peer-ports 12380,22380,32380 \
-limit 1 \
-schedule-cases "0 1 2 3 4 5" \
-exit-on-failure
-exit-on-failure && echo "'etcd-tester' succeeded"
ETCD_TESTER_EXIT_CODE=$?
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.2.2"
Version = "3.2.4"
APIVersion = "unknown"
// Git SHA Value will be set during build