Merge pull request #16278 from chaochn47/grpc_health_check_poc
gRPC health server sets serving status to NOT_SERVING on defrag
This commit is contained in:
commit
4d77fd1efa
@ -76,6 +76,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Add [`etcd --experimental-snapshot-catch-up-entries`](https://github.com/etcd-io/etcd/pull/15033) flag to configure number of entries for a slow follower to catch up after compacting the raft storage entries and defaults to 5k.
|
||||
- Decreased [`--snapshot-count` default value from 100,000 to 10,000](https://github.com/etcd-io/etcd/pull/15408)
|
||||
- Add [`etcd --tls-min-version --tls-max-version`](https://github.com/etcd-io/etcd/pull/15156) to enable support for TLS 1.3.
|
||||
- Add [`etcd --experimental-stop-grpc-service-on-defrag`](https://github.com/etcd-io/etcd/pull/16278) to enable client failover on defrag.
|
||||
|
||||
### etcd grpc-proxy
|
||||
|
||||
|
@ -192,6 +192,9 @@ type ServerConfig struct {
|
||||
// a shared buffer in its readonly check operations.
|
||||
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
|
||||
|
||||
// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
|
||||
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`
|
||||
|
||||
// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
|
||||
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
|
||||
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
|
||||
|
@ -429,6 +429,9 @@ type Config struct {
|
||||
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
|
||||
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
|
||||
|
||||
// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
|
||||
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`
|
||||
|
||||
// V2Deprecation describes phase of API & Storage V2 support
|
||||
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`
|
||||
}
|
||||
@ -529,6 +532,7 @@ func NewConfig() *Config {
|
||||
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
|
||||
ExperimentalMemoryMlock: false,
|
||||
ExperimentalTxnModeWriteWithSharedBuffer: true,
|
||||
ExperimentalStopGRPCServiceOnDefrag: false,
|
||||
ExperimentalMaxLearners: membership.DefaultMaxLearners,
|
||||
|
||||
ExperimentalCompactHashCheckEnabled: false,
|
||||
@ -725,6 +729,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
|
||||
fs.DurationVar(&cfg.ExperimentalWarningUnaryRequestDuration, "experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration, "Time duration after which a warning is generated if a unary request takes more time. It's deprecated, and will be decommissioned in v3.7. Use --warning-unary-request-duration instead.")
|
||||
fs.BoolVar(&cfg.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
|
||||
fs.BoolVar(&cfg.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
|
||||
fs.BoolVar(&cfg.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
|
||||
fs.UintVar(&cfg.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
|
||||
fs.IntVar(&cfg.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
|
||||
fs.Uint64Var(&cfg.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the raft storage entries.")
|
||||
|
@ -223,6 +223,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
|
||||
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
|
||||
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
|
||||
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
|
||||
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
|
||||
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
|
||||
V2Deprecation: cfg.V2DeprecationEffective(),
|
||||
|
@ -305,6 +305,8 @@ Experimental feature:
|
||||
Enable to enforce etcd pages (in particular bbolt) to stay in RAM.
|
||||
--experimental-snapshot-catchup-entries
|
||||
Number of entries for a slow follower to catch up after compacting the raft storage entries.
|
||||
--experimental-stop-grpc-service-on-defrag
|
||||
Enable etcd gRPC service to stop serving client requests on defragmentation.
|
||||
|
||||
Unsafe feature:
|
||||
--force-new-cluster 'false'
|
||||
|
@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
|
||||
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
|
||||
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}
|
||||
|
||||
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
|
||||
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
|
||||
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)
|
||||
|
||||
clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
|
||||
|
@ -75,13 +75,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
|
||||
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
|
||||
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
|
||||
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
|
||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
|
||||
|
||||
// server should register all the services manually
|
||||
// use empty service name for all etcd services' health status,
|
||||
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
|
||||
hsrv := health.NewServer()
|
||||
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
|
||||
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
|
||||
healthpb.RegisterHealthServer(grpcServer, hsrv)
|
||||
|
||||
// set zero values for metrics registered for this grpc server
|
||||
|
68
server/etcdserver/api/v3rpc/health.go
Normal file
68
server/etcdserver/api/v3rpc/health.go
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v3rpc
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/health"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
const (
|
||||
allGRPCServices = ""
|
||||
)
|
||||
|
||||
type HealthNotifier interface {
|
||||
StartServe()
|
||||
StopServe(reason string)
|
||||
}
|
||||
|
||||
func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
|
||||
if hs == nil {
|
||||
panic("unexpected nil gRPC health server")
|
||||
}
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
hc := &healthChecker{hs: hs, lg: lg}
|
||||
// set grpc health server as serving status blindly since
|
||||
// the grpc server will serve iff s.ReadyNotify() is closed.
|
||||
hc.StartServe()
|
||||
return hc
|
||||
}
|
||||
|
||||
type healthChecker struct {
|
||||
hs *health.Server
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func (hc *healthChecker) StartServe() {
|
||||
hc.lg.Info(
|
||||
"grpc service status changed",
|
||||
zap.String("service", allGRPCServices),
|
||||
zap.String("status", healthpb.HealthCheckResponse_SERVING.String()),
|
||||
)
|
||||
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
|
||||
}
|
||||
|
||||
func (hc *healthChecker) StopServe(reason string) {
|
||||
hc.lg.Warn(
|
||||
"grpc service status changed",
|
||||
zap.String("service", allGRPCServices),
|
||||
zap.String("status", healthpb.HealthCheckResponse_NOT_SERVING.String()),
|
||||
zap.String("reason", reason),
|
||||
)
|
||||
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_NOT_SERVING)
|
||||
}
|
@ -74,10 +74,13 @@ type maintenanceServer struct {
|
||||
cs ClusterStatusGetter
|
||||
d Downgrader
|
||||
vs serverversion.Server
|
||||
hn HealthNotifier
|
||||
|
||||
stopServingOnDefrag bool
|
||||
}
|
||||
|
||||
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
|
||||
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
|
||||
func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
|
||||
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
|
||||
if srv.lg == nil {
|
||||
srv.lg = zap.NewNop()
|
||||
}
|
||||
@ -86,6 +89,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
|
||||
|
||||
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
|
||||
ms.lg.Info("starting defragment")
|
||||
if ms.stopServingOnDefrag {
|
||||
ms.hn.StopServe("defrag is active")
|
||||
defer ms.hn.StartServe()
|
||||
}
|
||||
err := ms.bg.Backend().Defrag()
|
||||
if err != nil {
|
||||
ms.lg.Warn("failed to defragment", zap.Error(err))
|
||||
|
163
tests/e2e/failover_test.go
Normal file
163
tests/e2e/failover_test.go
Normal file
@ -0,0 +1,163 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
_ "google.golang.org/grpc/health"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
const (
|
||||
// in sync with how kubernetes uses etcd
|
||||
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
|
||||
keepaliveTime = 30 * time.Second
|
||||
keepaliveTimeout = 10 * time.Second
|
||||
dialTimeout = 20 * time.Second
|
||||
|
||||
clientRuntime = 10 * time.Second
|
||||
requestTimeout = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
func TestFailoverOnDefrag(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
clusterOptions []e2e.EPClusterOption
|
||||
gRPCDialOptions []grpc.DialOption
|
||||
|
||||
// common assertion
|
||||
expectedMinTotalRequestsCount int
|
||||
// happy case assertion
|
||||
expectedMaxFailedRequestsCount int
|
||||
// negative case assertion
|
||||
expectedMinFailedRequestsCount int
|
||||
}{
|
||||
{
|
||||
name: "defrag failover happy case",
|
||||
clusterOptions: []e2e.EPClusterOption{
|
||||
e2e.WithClusterSize(3),
|
||||
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
|
||||
e2e.WithGoFailEnabled(true),
|
||||
},
|
||||
gRPCDialOptions: []grpc.DialOption{
|
||||
grpc.WithDisableServiceConfig(),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
||||
},
|
||||
expectedMinTotalRequestsCount: 300,
|
||||
expectedMaxFailedRequestsCount: 5,
|
||||
},
|
||||
{
|
||||
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
|
||||
clusterOptions: []e2e.EPClusterOption{
|
||||
e2e.WithClusterSize(3),
|
||||
e2e.WithExperimentalStopGRPCServiceOnDefrag(false),
|
||||
e2e.WithGoFailEnabled(true),
|
||||
},
|
||||
gRPCDialOptions: []grpc.DialOption{
|
||||
grpc.WithDisableServiceConfig(),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
|
||||
},
|
||||
expectedMinTotalRequestsCount: 300,
|
||||
expectedMinFailedRequestsCount: 90,
|
||||
},
|
||||
{
|
||||
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
|
||||
clusterOptions: []e2e.EPClusterOption{
|
||||
e2e.WithClusterSize(3),
|
||||
e2e.WithExperimentalStopGRPCServiceOnDefrag(true),
|
||||
e2e.WithGoFailEnabled(true),
|
||||
},
|
||||
expectedMinTotalRequestsCount: 300,
|
||||
expectedMinFailedRequestsCount: 90,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
clus, cerr := e2e.NewEtcdProcessCluster(context.TODO(), t, tc.clusterOptions...)
|
||||
require.NoError(t, cerr)
|
||||
t.Cleanup(func() { clus.Stop() })
|
||||
|
||||
endpoints := clus.EndpointsGRPC()
|
||||
|
||||
requestVolume, successfulRequestCount := 0, 0
|
||||
g := new(errgroup.Group)
|
||||
g.Go(func() (lastErr error) {
|
||||
clusterClient, cerr := clientv3.New(clientv3.Config{
|
||||
DialTimeout: dialTimeout,
|
||||
DialKeepAliveTime: keepaliveTime,
|
||||
DialKeepAliveTimeout: keepaliveTimeout,
|
||||
Endpoints: endpoints,
|
||||
DialOptions: tc.gRPCDialOptions,
|
||||
})
|
||||
if cerr != nil {
|
||||
return cerr
|
||||
}
|
||||
defer clusterClient.Close()
|
||||
|
||||
timeout := time.After(clientRuntime)
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
return lastErr
|
||||
default:
|
||||
}
|
||||
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
_, err := clusterClient.Get(getContext, "health")
|
||||
cancel()
|
||||
requestVolume++
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
successfulRequestCount++
|
||||
}
|
||||
})
|
||||
triggerDefrag(t, clus.Procs[0])
|
||||
|
||||
err := g.Wait()
|
||||
if err != nil {
|
||||
t.Logf("etcd client failed to fail over, error (%v)", err)
|
||||
}
|
||||
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)
|
||||
|
||||
require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
|
||||
failedRequestCount := requestVolume - successfulRequestCount
|
||||
if tc.expectedMaxFailedRequestsCount != 0 {
|
||||
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
|
||||
}
|
||||
if tc.expectedMinFailedRequestsCount != 0 {
|
||||
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
|
||||
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
|
||||
require.NoError(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute}))
|
||||
}
|
@ -340,6 +340,12 @@ func WithExperimentalWarningUnaryRequestDuration(time time.Duration) EPClusterOp
|
||||
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalWarningUnaryRequestDuration = time }
|
||||
}
|
||||
|
||||
func WithExperimentalStopGRPCServiceOnDefrag(stopGRPCServiceOnDefrag bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) {
|
||||
c.ServerConfig.ExperimentalStopGRPCServiceOnDefrag = stopGRPCServiceOnDefrag
|
||||
}
|
||||
}
|
||||
|
||||
func WithCompactionBatchLimit(limit int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalCompactionBatchLimit = limit }
|
||||
}
|
||||
|
@ -172,6 +172,8 @@ type ClusterConfig struct {
|
||||
ExperimentalMaxLearners int
|
||||
DisableStrictReconfigCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
|
||||
ExperimentalStopGRPCServiceOnDefrag bool
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
@ -288,6 +290,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
|
||||
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
|
||||
DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck,
|
||||
CorruptCheckTime: c.Cfg.CorruptCheckTime,
|
||||
ExperimentalStopGRPCServiceOnDefrag: c.Cfg.ExperimentalStopGRPCServiceOnDefrag,
|
||||
})
|
||||
m.DiscoveryURL = c.Cfg.DiscoveryURL
|
||||
return m
|
||||
@ -614,6 +617,8 @@ type MemberConfig struct {
|
||||
ExperimentalMaxLearners int
|
||||
DisableStrictReconfigCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
|
||||
ExperimentalStopGRPCServiceOnDefrag bool
|
||||
}
|
||||
|
||||
// MustNewMember return an inited member with the given name. If peerTLS is
|
||||
@ -723,6 +728,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
|
||||
if mcfg.CorruptCheckTime > time.Duration(0) {
|
||||
m.CorruptCheckTime = mcfg.CorruptCheckTime
|
||||
}
|
||||
m.ExperimentalStopGRPCServiceOnDefrag = mcfg.ExperimentalStopGRPCServiceOnDefrag
|
||||
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
|
||||
m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration
|
||||
m.ExperimentalMaxLearners = membership.DefaultMaxLearners
|
||||
|
Loading…
Reference in New Issue
Block a user