Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
c31bec0f29 | |||
19fe4b0cac | |||
a5d94fe229 | |||
e8f3cbf1c6 | |||
856502f788 | |||
ae23b0ef2f | |||
5ee89be616 | |||
38373b342d | |||
536a5f594b | |||
49e6916e66 | |||
b9b6f6f7c4 | |||
6ecbb3bbc5 |
@ -24,6 +24,11 @@ curl -L http://localhost:2379/v3alpha/kv/put \
|
||||
curl -L http://localhost:2379/v3alpha/kv/range \
|
||||
-X POST -d '{"key": "Zm9v"}'
|
||||
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
|
||||
|
||||
# get all keys prefixed with "foo"
|
||||
curl -L http://localhost:2379/v3alpha/kv/range \
|
||||
-X POST -d '{"key": "Zm9v", "range_end": "Zm9w"}'
|
||||
# {"header":{"cluster_id":"12585971608760269493","member_id":"13847567121247652255","revision":"2","raft_term":"3"},"kvs":[{"key":"Zm9v","create_revision":"2","mod_revision":"2","version":"1","value":"YmFy"}],"count":"1"}
|
||||
```
|
||||
|
||||
Use `curl` to watch a key:
|
||||
|
@ -449,7 +449,7 @@ message LeaseRevokeRequest {
|
||||
|
||||
### Keep alives
|
||||
|
||||
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseGrantRequest` over the stream:
|
||||
Leases are refreshed using a bi-directional stream created with the `LeaseKeepAlive` API call. When the client wishes to refresh a lease, it sends a `LeaseKeepAliveRequest` over the stream:
|
||||
|
||||
```protobuf
|
||||
message LeaseKeepAliveRequest {
|
||||
|
@ -186,11 +186,29 @@ func (e *Etcd) Config() Config {
|
||||
func (e *Etcd) Close() {
|
||||
e.closeOnce.Do(func() { close(e.stopc) })
|
||||
|
||||
// (gRPC server) stops accepting new connections,
|
||||
// RPCs, and blocks until all pending RPCs are finished
|
||||
timeout := 2 * time.Second
|
||||
if e.Server != nil {
|
||||
timeout = e.Server.Cfg.ReqTimeout()
|
||||
}
|
||||
for _, sctx := range e.sctxs {
|
||||
for gs := range sctx.grpcServerC {
|
||||
gs.GracefulStop()
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
// close listeners to stop accepting new connections,
|
||||
// will block on any existing transports
|
||||
gs.GracefulStop()
|
||||
}()
|
||||
// wait until all pending RPCs are finished
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(timeout):
|
||||
// took too long, manually close open transports
|
||||
// e.g. watch streams
|
||||
gs.Stop()
|
||||
// concurrent GracefulStop should be interrupted
|
||||
<-ch
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"math"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -24,6 +25,8 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
const maxStreams = math.MaxUint32
|
||||
|
||||
func init() {
|
||||
grpclog.SetLogger(plog)
|
||||
}
|
||||
@ -36,8 +39,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
|
||||
}
|
||||
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
|
||||
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
|
||||
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
|
||||
grpcServer := grpc.NewServer(opts...)
|
||||
|
||||
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
|
||||
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
|
||||
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
|
||||
|
@ -15,13 +15,16 @@
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/embed"
|
||||
)
|
||||
|
||||
@ -102,6 +105,47 @@ func TestEmbedEtcd(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestEmbedEtcdGracefulStop ensures embedded server stops
|
||||
// cutting existing transports.
|
||||
func TestEmbedEtcdGracefulStop(t *testing.T) {
|
||||
cfg := embed.NewConfig()
|
||||
|
||||
urls := newEmbedURLs(2)
|
||||
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
|
||||
|
||||
cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprintf("embed-etcd"))
|
||||
os.RemoveAll(cfg.Dir)
|
||||
defer os.RemoveAll(cfg.Dir)
|
||||
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
|
||||
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{urls[0].String()}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// open watch connection
|
||||
cli.Watch(context.Background(), "foo")
|
||||
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
e.Close()
|
||||
close(donec)
|
||||
}()
|
||||
select {
|
||||
case err := <-e.Err():
|
||||
t.Fatal(err)
|
||||
case <-donec:
|
||||
case <-time.After(2*time.Second + e.Server.Cfg.ReqTimeout()):
|
||||
t.Fatalf("took too long to close server")
|
||||
}
|
||||
}
|
||||
|
||||
func newEmbedURLs(n int) (urls []url.URL) {
|
||||
for i := 0; i < n; i++ {
|
||||
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
|
||||
|
@ -118,6 +118,7 @@ func interestingGoroutines() (gs []string) {
|
||||
}
|
||||
stack := strings.TrimSpace(sl[1])
|
||||
if stack == "" ||
|
||||
strings.Contains(stack, "sync.(*WaitGroup).Done") ||
|
||||
strings.Contains(stack, "created by os/signal.init") ||
|
||||
strings.Contains(stack, "runtime/panic.go") ||
|
||||
strings.Contains(stack, "created by testing.RunTests") ||
|
||||
|
@ -15,6 +15,8 @@
|
||||
package grpcproxy
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan
|
||||
for {
|
||||
rr, err := sc.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = stream.Send(rr)
|
||||
|
10
test
10
test
@ -100,13 +100,21 @@ function functional_pass {
|
||||
agent_pids="${agent_pids} $pid"
|
||||
done
|
||||
|
||||
for a in 1 2 3; do
|
||||
echo "Waiting for 'etcd-agent' on ${a}9027..."
|
||||
while ! nc -z localhost ${a}9027; do
|
||||
sleep 1
|
||||
done
|
||||
done
|
||||
|
||||
echo "Starting 'etcd-tester'"
|
||||
./bin/etcd-tester \
|
||||
-agent-endpoints "127.0.0.1:19027,127.0.0.1:29027,127.0.0.1:39027" \
|
||||
-client-ports 12379,22379,32379 \
|
||||
-peer-ports 12380,22380,32380 \
|
||||
-limit 1 \
|
||||
-schedule-cases "0 1 2 3 4 5" \
|
||||
-exit-on-failure
|
||||
-exit-on-failure && echo "'etcd-tester' succeeded"
|
||||
ETCD_TESTER_EXIT_CODE=$?
|
||||
echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}
|
||||
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.2"
|
||||
Version = "3.2.4"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user