Compare commits

...

18 Commits

Author SHA1 Message Date
99018a77be version: bump up to 3.5.2 2022-02-01 12:28:34 +01:00
a624446907 Merge pull request #13616 from ptabor/20220117-update-yaml
Update dep: gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254 [release 3.5]
2022-01-17 20:07:16 +01:00
74f33d6665 Update dep: require gopkg.in/yaml.v2 v2.2.8 -> v2.4.0 due to: CVE-2019-11254. 2022-01-17 17:10:50 +01:00
7291ed3c4a Merge pull request #13541 from michaljasionowski/backport-runlock-fix
Backport watchablestore runlock bug fix to release-3.5
2021-12-21 11:03:31 +01:00
55c16df997 fix runlock bug 2021-12-16 15:58:41 +00:00
73080a7166 Merge pull request #13501 from ahrtr/reset_ci_after_reload_db_3.5
[3.5] Set the backend again after recovering v3 backend from snapshot
2021-12-06 13:22:22 +01:00
e84c61104c Merge pull request #13515 from serathius/checkpoints-fix-3.5
Backport Lease Checkpoints fix to release-3.5
2021-12-03 12:21:02 +01:00
d00e89db2e server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL
To avoid inconsistant behavior during cluster upgrade we are feature
gating persistance behind cluster version. This should ensure that
all cluster members are upgraded to v3.6 before changing behavior.

To allow backporting this fix to v3.5 we are also introducing flag
--experimental-enable-lease-checkpoint-persist that will allow for
smooth upgrade in v3.5 clusters with this feature enabled.
2021-12-02 16:54:10 +01:00
eddfb4232f etcdserver,integration: Store remaining TTL on checkpoint
To extend lease checkpointing mechanism to cases when the whole etcd
cluster is restarted.
2021-12-02 16:42:20 +01:00
21634a98c6 lease,integration: add checkpoint scheduling after leader change
Current checkpointing mechanism is buggy. New checkpoints for any lease
are scheduled only until the first leader change. Added fix for that
and a test that will check it.
2021-12-02 16:40:14 +01:00
8c81598455 set the backend again after recovering v3 backend from snapshot 2021-11-25 05:45:20 +08:00
eac7f98699 Merge pull request #13477 from mitake/backport-13308-to-3.5
Backport PR 13308 to release-3.5
2021-11-21 14:45:15 -05:00
dec6f72d68 *: implement a retry logic for auth old revision in the client 2021-11-15 00:09:16 +09:00
79bbc8fdb7 client/v3: refresh the token when ErrUserEmpty is received while retrying
To fix a bug in the retry logic caused when the auth token is cleared after receiving `ErrInvalidAuthToken` from the server and the subsequent call to `getToken` also fails due to some reason (eg. context deadline exceeded).
This leaves the client without a token and the retry will continue to fail with `ErrUserEmpty` unless the token is refreshed.
2021-11-15 00:09:09 +09:00
77d760bf1b Merge pull request #13476 from chaochn47/backport-release-3.5
cherry-pick to 3.5 from #13467 exclude the same alarm type activated by multiple peers
2021-11-13 22:10:19 -05:00
7d44a7cd6e server/etcdserver/api/etcdhttp: exclude the same alarm type activated by multiple peers 2021-11-12 14:21:14 -08:00
e8732fb5f3 Merge pull request #13395 from geetasg/release-3.5
storage/backend: Add a gauge to indicate if defrag is active (backport)
2021-10-07 18:16:21 +02:00
446f7d6b6e storage/backend: Add a gauge to indicate if defrag is active (backport from 3.6) 2021-10-06 11:01:31 -07:00
35 changed files with 554 additions and 135 deletions

View File

@ -9,6 +9,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0 // indirect
) )
// Bad imports are sometimes causing attempts to pull that code. // Bad imports are sometimes causing attempts to pull that code.

View File

@ -143,7 +143,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -65,6 +65,7 @@ var (
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err() ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err() ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err() ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
@ -131,6 +132,7 @@ var (
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled, ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken, ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt, ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
@ -195,6 +197,7 @@ var (
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted) ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled) ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
ErrNoLeader = Error(ErrGRPCNoLeader) ErrNoLeader = Error(ErrGRPCNoLeader)

View File

@ -26,7 +26,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0" MinClusterVersion = "3.0.0"
Version = "3.5.1" Version = "3.5.2"
APIVersion = "unknown" APIVersion = "unknown"
// Git SHA Value will be set during build // Git SHA Value will be set during build

View File

@ -5,8 +5,8 @@ go 1.16
require ( require (
github.com/json-iterator/go v1.1.11 github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1 github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
) )
replace ( replace (

View File

@ -154,8 +154,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -6,8 +6,8 @@ require (
github.com/dustin/go-humanize v1.0.0 github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_golang v1.11.0
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml v1.2.0

View File

@ -258,8 +258,9 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -73,7 +73,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
// its the callCtx deadline or cancellation, in which case try again. // its the callCtx deadline or cancellation, in which case try again.
continue continue
} }
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { if c.shouldRefreshToken(lastErr, callOpts) {
// clear auth token before refreshing it. // clear auth token before refreshing it.
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side, // call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165) // if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
@ -148,6 +148,19 @@ func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamCli
} }
} }
// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions,
// and returns a boolean value.
func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
// refresh the token when username, password is present but the server returns ErrUserEmpty
// which is possible when the client token is cleared somehow
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
}
return callOpts.retryAuth &&
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
}
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy. // a new ClientStream according to the retry policy.
@ -245,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
// its the callCtx deadline or cancellation, in which case try again. // its the callCtx deadline or cancellation, in which case try again.
return true, err return true, err
} }
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { if s.client.shouldRefreshToken(err, s.callOpts) {
// clear auth token to avoid failure when call getToken // clear auth token to avoid failure when call getToken
s.client.authTokenBundle.UpdateAuthToken("") s.client.authTokenBundle.UpdateAuthToken("")

View File

@ -0,0 +1,124 @@
package clientv3
import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/credentials"
grpccredentials "google.golang.org/grpc/credentials"
"testing"
)
type dummyAuthTokenBundle struct{}
func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
return nil
}
func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
return nil
}
func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
return nil, nil
}
func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
}
func TestClientShouldRefreshToken(t *testing.T) {
type fields struct {
authTokenBundle credentials.Bundle
}
type args struct {
err error
callOpts *options
}
optsWithTrue := &options{
retryAuth: true,
}
optsWithFalse := &options{
retryAuth: false,
}
tests := []struct {
name string
fields fields
args args
want bool
}{
{
name: "ErrUserEmpty and non nil authTokenBundle",
fields: fields{
authTokenBundle: &dummyAuthTokenBundle{},
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: true,
},
{
name: "ErrUserEmpty and nil authTokenBundle",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: false,
},
{
name: "ErrGRPCInvalidAuthToken and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
want: true,
},
{
name: "ErrGRPCInvalidAuthToken and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
want: false,
},
{
name: "ErrGRPCAuthOldRevision and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
want: true,
},
{
name: "ErrGRPCAuthOldRevision and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
want: false,
},
{
name: "Other error and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
want: false,
},
{
name: "Other error and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
authTokenBundle: tt.fields.authTokenBundle,
}
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4 github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.1 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.1 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.1 go.etcd.io/etcd/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.1 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.1 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.1 go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
) )

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0 github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.1 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdctl/v3 v3.5.1 go.etcd.io/etcd/etcdctl/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.1 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.1 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.1 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.1 go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.1 go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
) )

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
) )
// Bad imports are sometimes causing attempts to pull that code. // Bad imports are sometimes causing attempts to pull that code.

View File

@ -147,10 +147,12 @@ type ServerConfig struct {
ForceNewCluster bool ForceNewCluster bool
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool
EnableGRPCGateway bool EnableGRPCGateway bool

View File

@ -314,10 +314,15 @@ type Config struct {
// Deprecated in v3.5. // Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value. // takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
@ -678,6 +683,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
} }
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}
return nil return nil
} }

View File

@ -291,6 +291,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
} }
} }
func TestLeaseCheckpointValidate(t *testing.T) {
tcs := []struct {
name string
configFunc func() Config
expectError bool
}{
{
name: "Default config should pass",
configFunc: func() Config {
return *NewConfig()
},
},
{
name: "Enabling checkpoint leases should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
return cfg
},
},
{
name: "Enabling checkpoint leases and persist should pass",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpoint = true
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
},
{
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
configFunc: func() Config {
cfg := *NewConfig()
cfg.ExperimentalEnableLeaseCheckpointPersist = true
return cfg
},
expectError: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cfg := tc.configFunc()
err := cfg.Validate()
if (err != nil) != tc.expectError {
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
}
})
}
}
func TestLogRotation(t *testing.T) { func TestLogRotation(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -216,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync, UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,

View File

@ -280,7 +280,9 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")

View File

@ -138,8 +138,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
for _, v := range as { for _, v := range as {
alarmName := v.Alarm.String() alarmName := v.Alarm.String()
if _, found := excludedAlarms[alarmName]; found { if _, found := excludedAlarms[alarmName]; found {
lg.Debug("/health excluded alarm", zap.String("alarm", alarmName)) lg.Debug("/health excluded alarm", zap.String("alarm", v.String()))
delete(excludedAlarms, alarmName)
continue continue
} }
@ -157,10 +156,6 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
} }
} }
if len(excludedAlarms) > 0 {
lg.Warn("fail exclude alarms from health check", zap.String("exclude alarms", fmt.Sprintf("%+v", excludedAlarms)))
}
if uint64(srv.Leader()) == raft.None { if uint64(srv.Leader()) == raft.None {
h.Health = "false" h.Health = "false"
h.Reason = "RAFT NO LEADER" h.Reason = "RAFT NO LEADER"

View File

@ -78,6 +78,12 @@ func TestHealthHandler(t *testing.T) {
http.StatusOK, http.StatusOK,
"true", "true",
}, },
{
[]*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
"/health?exclude=NOSPACE",
http.StatusOK,
"true",
},
{ {
[]*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
"/health?exclude=NOSPACE", "/health?exclude=NOSPACE",

View File

@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled, auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken, auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt, auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
// In sync with status.FromContextError // In sync with status.FromContextError
context.Canceled: rpctypes.ErrGRPCCanceled, context.Canceled: rpctypes.ErrGRPCCanceled,

View File

@ -514,6 +514,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
} }
// A snapshot db may have already been recovered, and the old db should have
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)
s1, s2 := be.Size(), be.SizeInUse() s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info( cfg.Logger.Info(
"recovered v3 backend from snapshot", "recovered v3 backend from snapshot",
@ -592,9 +595,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval, CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
}) })

View File

@ -25,12 +25,12 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.1 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.1 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.1 go.etcd.io/etcd/raft/v3 v3.5.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0 go.opentelemetry.io/otel/exporters/otlp v0.20.0

View File

@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be) defer betesting.Close(t, be)
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second) le.Promote(time.Second)
l, err := le.Grant(1, int64(5)) l, err := le.Grant(1, int64(5))
if err != nil { if err != nil {

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value // MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000 const MaxLeaseTTL = 9000000000
var v3_6 = semver.Version{Major: 3, Minor: 6}
var ( var (
forever = time.Time{} forever = time.Time{}
@ -180,19 +183,29 @@ type lessor struct {
checkpointInterval time.Duration checkpointInterval time.Duration
// the interval to check if the expired lease is revoked // the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}
type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
} }
type LessorConfig struct { type LessorConfig struct {
MinLeaseTTL int64 MinLeaseTTL int64
CheckpointInterval time.Duration CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
} }
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg) return newLessor(lg, b, cluster, cfg)
} }
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 { if checkpointInterval == 0 {
@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL, minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval, checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking. // expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16), expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}), stopC: make(chan struct{}),
doneC: make(chan struct{}), doneC: make(chan struct{}),
lg: lg, lg: lg,
cluster: cluster,
} }
l.initAndRecover() l.initAndRecover()
@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok { if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() { if le.isPrimary() {
// schedule the next checkpoint as needed // schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l) le.scheduleCheckpointIfNeeded(l)
@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil return nil
} }
func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}
func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}
// Renew renews an existing lease. If the given lease does not exist or // Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned. // has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) { func (le *lessor) Renew(id LeaseID) (int64, error) {
@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend) l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry} item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item) le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
} }
if len(le.leaseMap) < leaseRevokeRate { if len(le.leaseMap) < leaseRevokeRate {
@ -789,9 +817,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL, ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs // itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted // set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}), itemSet: make(map[LeaseItem]struct{}),
expiry: forever, expiry: forever,
revokec: make(chan struct{}), revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
} }
} }
le.leaseExpiredNotifier.Init() le.leaseExpiredNotifier.Init()

View File

@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t) be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark. // MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete { le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()} ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock() ftd.Lock()

View File

@ -26,7 +26,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap" "go.uber.org/zap"
@ -46,7 +48,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.Promote(0) le.Promote(0)
@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
var fd *fakeDeleter var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete { le.SetRangeDeleter(func() TxnDelete {
@ -210,7 +212,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close() defer be.Close()
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.Promote(0) le.Promote(0)
@ -243,7 +245,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close() defer be.Close()
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() { for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@ -292,7 +294,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t) dir, be := NewTestBackend(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10) ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ { for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -311,7 +313,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be") bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg) be = backend.New(bcfg)
defer be.Close() defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
// extend after recovery should extend expiration on lease pile-up // extend after recovery should extend expiration on lease pile-up
@ -341,7 +343,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -382,7 +384,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
l1, err1 := le.Grant(1, 10) l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20) l2, err2 := le.Grant(2, 20)
@ -391,7 +393,7 @@ func TestLessorRecover(t *testing.T) {
} }
// Create a new lessor with the same backend // Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop() defer nle.Stop()
nl1 := nle.Lookup(l1.ID) nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl { if nl1 == nil || nl1.ttl != l1.ttl {
@ -412,7 +414,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1) testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop() defer le.Stop()
le.Promote(1 * time.Second) le.Promote(1 * time.Second)
@ -465,7 +467,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1) testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop() defer le.Stop()
le.Promote(1 * time.Second) le.Promote(1 * time.Second)
@ -514,7 +516,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1) _, err := le.Grant(1, MaxLeaseTTL+1)
@ -530,7 +532,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
defer le.Stop()
le.minLeaseTTL = 1 le.minLeaseTTL = 1
checkpointedC := make(chan struct{}) checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@ -543,13 +546,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
} }
}) })
defer le.Stop()
le.Promote(0)
_, err := le.Grant(1, 2) _, err := le.Grant(1, 2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
le.Promote(0)
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select { select {
@ -565,7 +566,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer be.Close() defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop() defer le.Stop()
l, err := le.Grant(1, 10) l, err := le.Grant(1, 10)
if err != nil { if err != nil {
@ -579,6 +580,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
} }
} }
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
const ttl int64 = 10
const checkpointTTL int64 = 5
tcs := []struct {
name string
cluster cluster
checkpointPersist bool
expectRemainingTTL int64
}{
{
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
cluster: clusterV3_6(),
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
cluster: clusterLatest(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
cluster: clusterNil(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
cluster: clusterLatest(),
expectRemainingTTL: ttl,
},
{
name: "Etcd with version unknown fallbacks to v3.5 behavior",
cluster: clusterNil(),
expectRemainingTTL: ttl,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
cfg.CheckpointPersist = tc.checkpointPersist
le := newLessor(lg, be, tc.cluster, cfg)
l, err := le.Grant(2, ttl)
if err != nil {
t.Fatal(err)
}
if l.RemainingTTL() != ttl {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
}
le.Checkpoint(2, checkpointTTL)
if l.RemainingTTL() != checkpointTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
}
le.Stop()
le2 := newLessor(lg, be, clusterV3_6(), cfg)
l = le2.Lookup(2)
if l.RemainingTTL() != tc.expectRemainingTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
}
})
}
}
type fakeDeleter struct { type fakeDeleter struct {
deleted []string deleted []string
tx backend.BatchTx tx backend.BatchTx
@ -606,3 +676,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be") bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg) return tmpPath, backend.New(bcfg)
} }
func clusterV3_6() cluster {
return fakeCluster{semver.New("3.6.0")}
}
func clusterLatest() cluster {
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
}
func clusterNil() cluster {
return fakeCluster{}
}
type fakeCluster struct {
version *semver.Version
}
func (c fakeCluster) Version() *semver.Version {
return c.version
}

View File

@ -432,6 +432,8 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error { func (b *backend) defrag() error {
now := time.Now() now := time.Now()
isDefragActive.Set(1)
defer isDefragActive.Set(0)
// TODO: make this non-blocking? // TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then // lock batchTx to ensure nobody is using previous tx, and then

View File

@ -83,6 +83,13 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec // highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17), Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
}) })
isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "defrag_inflight",
Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
})
) )
func init() { func init() {
@ -92,4 +99,5 @@ func init() {
prometheus.MustRegister(writeSec) prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec) prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec) prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(isDefragActive)
} }

View File

@ -355,8 +355,11 @@ func (s *watchableStore) syncWatchers() int {
tx := s.store.b.ReadTx() tx := s.store.b.ReadTx()
tx.RLock() tx.RLock()
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0) revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs) evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
var victims watcherBatch var victims watcherBatch
wb := newWatcherBatch(wg, evs) wb := newWatcherBatch(wg, evs)

View File

@ -28,14 +28,14 @@ require (
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/api/v3 v3.5.2
go.etcd.io/etcd/client/pkg/v3 v3.5.1 go.etcd.io/etcd/client/pkg/v3 v3.5.2
go.etcd.io/etcd/client/v2 v2.305.1 go.etcd.io/etcd/client/v2 v2.305.2
go.etcd.io/etcd/client/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.2
go.etcd.io/etcd/etcdutl/v3 v3.5.1 go.etcd.io/etcd/etcdutl/v3 v3.5.2
go.etcd.io/etcd/pkg/v3 v3.5.1 go.etcd.io/etcd/pkg/v3 v3.5.2
go.etcd.io/etcd/raft/v3 v3.5.1 go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.1 go.etcd.io/etcd/server/v3 v3.5.2
go.uber.org/zap v1.17.0 go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

View File

@ -167,6 +167,7 @@ type ClusterConfig struct {
EnableLeaseCheckpoint bool EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration WatchProgressNotifyInterval time.Duration
} }
@ -328,6 +329,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
useBridge: c.cfg.UseBridge, useBridge: c.cfg.UseBridge,
useTCP: c.cfg.UseTCP, useTCP: c.cfg.UseTCP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
}) })
@ -631,6 +633,7 @@ type memberConfig struct {
useTCP bool useTCP bool
enableLeaseCheckpoint bool enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration WatchProgressNotifyInterval time.Duration
} }
@ -729,6 +732,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.useTCP = mcfg.useTCP m.useTCP = mcfg.useTCP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

View File

@ -229,56 +229,121 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections. // across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) { func TestV3LeaseCheckpoint(t *testing.T) {
BeforeTest(t) tcs := []struct {
name string
var ttl int64 = 300 checkpointingEnabled bool
leaseInterval := 2 * time.Second ttl time.Duration
clus := NewClusterV3(t, &ClusterConfig{ checkpointingInterval time.Duration
Size: 3, checkpointingPersist bool
EnableLeaseCheckpoint: true, leaderChanges int
LeaseCheckpointInterval: leaseInterval, clusterSize int
UseBridge: true, expectTTLIsGT time.Duration
}) expectTTLIsLT time.Duration
defer clus.Terminate(t) }{
{
// create lease name: "Checkpointing disabled, lease TTL is reset",
ctx, cancel := context.WithCancel(context.Background()) ttl: 300 * time.Second,
defer cancel() leaderChanges: 1,
c := toGRPC(clus.RandClient()) clusterSize: 3,
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) expectTTLIsGT: 298 * time.Second,
if err != nil { },
t.Fatal(err) {
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 3,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
checkpointingPersist: true,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsGT: 298 * time.Second,
},
{
// Checking if checkpointing continues after the first leader change.
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 2,
clusterSize: 3,
expectTTLIsLT: 280 * time.Second,
},
} }
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
BeforeTest(t)
config := &ClusterConfig{
Size: tc.clusterSize,
EnableLeaseCheckpoint: tc.checkpointingEnabled,
LeaseCheckpointInterval: tc.checkpointingInterval,
LeaseCheckpointPersist: tc.checkpointingPersist,
}
clus := NewClusterV3(t, config)
defer clus.Terminate(t)
// wait for a checkpoint to occur // create lease
time.Sleep(leaseInterval + 1*time.Second) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Force a leader election c := toGRPC(clus.RandClient())
leaderId := clus.WaitLeader(t) lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
leader := clus.Members[leaderId] if err != nil {
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
time.Sleep(250 * time.Millisecond)
// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err) t.Fatal(err)
} }
}
}
expectedTTL := ttl - int64(leaseInterval.Seconds()) for i := 0; i < tc.leaderChanges; i++ {
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { // wait for a checkpoint to occur
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) time.Sleep(tc.checkpointingInterval + 1*time.Second)
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
}
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
time.Sleep(250 * time.Millisecond)
// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
}
}
}
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
}
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
}
})
} }
} }