Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
121edf0467 | |||
b5abfe1858 | |||
33633da64c | |||
e08abbeae4 | |||
bdc3ed1970 | |||
1b3ac99e8a | |||
fd4595aa04 | |||
e5f63b64c3 | |||
68d27b2d84 | |||
7c4274be05 | |||
fb5cd6f1c7 | |||
6999bbb47b | |||
df4036ab73 | |||
848590e99e |
@ -16,6 +16,7 @@ package clientv3
|
||||
|
||||
import (
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
@ -65,6 +66,11 @@ func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
|
||||
if err != nil {
|
||||
@ -83,6 +89,11 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
||||
}
|
||||
|
||||
func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// it is safe to retry on update.
|
||||
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
|
||||
resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
|
||||
|
@ -127,3 +127,36 @@ func TestMemberUpdate(t *testing.T) {
|
||||
t.Errorf("urls = %v, want %v", urls, resp.Members[0].PeerURLs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemberAddUpdateWrongURLs(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
capi := clus.RandClient()
|
||||
tt := [][]string{
|
||||
// missing protocol scheme
|
||||
{"://127.0.0.1:2379"},
|
||||
// unsupported scheme
|
||||
{"mailto://127.0.0.1:2379"},
|
||||
// not conform to host:port
|
||||
{"http://127.0.0.1"},
|
||||
// contain a path
|
||||
{"http://127.0.0.1:2379/path"},
|
||||
// first path segment in URL cannot contain colon
|
||||
{"127.0.0.1:1234"},
|
||||
// URL scheme must be http, https, unix, or unixs
|
||||
{"localhost:1234"},
|
||||
}
|
||||
for i := range tt {
|
||||
_, err := capi.MemberAdd(context.Background(), tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberAdd err = nil, but error", i)
|
||||
}
|
||||
_, err = capi.MemberUpdate(context.Background(), 0, tt[i])
|
||||
if err == nil {
|
||||
t.Errorf("#%d: MemberUpdate err = nil, but error", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -92,7 +92,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -118,7 +122,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
resp.TTL = ttl
|
||||
err = stream.Send(resp)
|
||||
if err != nil {
|
||||
if isClientCtxErr(stream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -15,14 +15,18 @@
|
||||
package v3rpc
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/auth"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func togRPCError(err error) error {
|
||||
@ -101,3 +105,35 @@ func togRPCError(err error) error {
|
||||
return grpc.Errorf(codes.Unknown, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func isClientCtxErr(ctxErr error, err error) bool {
|
||||
if ctxErr != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ev, ok := status.FromError(err)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
switch ev.Code() {
|
||||
case codes.Canceled, codes.DeadlineExceeded:
|
||||
// client-side context cancel or deadline exceeded
|
||||
// "rpc error: code = Canceled desc = context canceled"
|
||||
// "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
|
||||
return true
|
||||
case codes.Unavailable:
|
||||
msg := ev.Message()
|
||||
// client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
|
||||
// "rpc error: code = Unavailable desc = client disconnected"
|
||||
if msg == "client disconnected" {
|
||||
return true
|
||||
}
|
||||
// "grpc/transport.ClientTransport.CloseStream" on canceled streams
|
||||
// "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
|
||||
if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -141,7 +141,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
// deadlock when calling sws.close().
|
||||
go func() {
|
||||
if rerr := sws.recvLoop(); rerr != nil {
|
||||
if isClientCtxErr(stream.Context().Err(), rerr) {
|
||||
plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
|
||||
}
|
||||
errc <- rerr
|
||||
}
|
||||
}()
|
||||
@ -338,7 +342,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
|
||||
mvcc.ReportEventReceived(len(evs))
|
||||
if err := sws.gRPCStream.Send(wr); err != nil {
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -355,7 +363,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
}
|
||||
|
||||
if err := sws.gRPCStream.Send(c); err != nil {
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -371,7 +383,11 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
for _, v := range pending[wid] {
|
||||
mvcc.ReportEventReceived(len(v.Events))
|
||||
if err := sws.gRPCStream.Send(v); err != nil {
|
||||
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
|
||||
plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
} else {
|
||||
plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -370,10 +370,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
}
|
||||
|
||||
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
if berr != nil {
|
||||
return berr
|
||||
}
|
||||
tmpb.FillPercent = 0.9 // for seq write in for each
|
||||
|
||||
b.ForEach(func(k, v []byte) error {
|
||||
count++
|
||||
|
@ -188,7 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
s.unsynced.watchers.add(wa)
|
||||
s.unsynced.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
return nil
|
||||
|
@ -295,6 +295,8 @@ func TestWatchFutureRev(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchRestore(t *testing.T) {
|
||||
test := func(delay time.Duration) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
@ -310,6 +312,8 @@ func TestWatchRestore(t *testing.T) {
|
||||
w := newStore.NewWatchStream()
|
||||
w.Watch(testKey, nil, rev-1)
|
||||
|
||||
time.Sleep(delay)
|
||||
|
||||
newStore.Restore(b)
|
||||
select {
|
||||
case resp := <-w.Chan():
|
||||
@ -326,6 +330,11 @@ func TestWatchRestore(t *testing.T) {
|
||||
t.Fatal("failed to receive event in 1 second.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("Normal", test(0))
|
||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.13"
|
||||
Version = "3.2.16"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user