Compare commits
18 Commits
tests/v3.5
...
pkg/v3.5.2
Author | SHA1 | Date | |
---|---|---|---|
99018a77be | |||
a624446907 | |||
74f33d6665 | |||
7291ed3c4a | |||
55c16df997 | |||
73080a7166 | |||
e84c61104c | |||
d00e89db2e | |||
eddfb4232f | |||
21634a98c6 | |||
8c81598455 | |||
eac7f98699 | |||
dec6f72d68 | |||
79bbc8fdb7 | |||
77d760bf1b | |||
7d44a7cd6e | |||
e8732fb5f3 | |||
446f7d6b6e |
@ -9,6 +9,7 @@ require (
|
|||||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||||
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
|
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bad imports are sometimes causing attempts to pull that code.
|
// Bad imports are sometimes causing attempts to pull that code.
|
||||||
|
@ -143,7 +143,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
|
|||||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
|
|
||||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
@ -65,6 +65,7 @@ var (
|
|||||||
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
|
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
|
||||||
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
|
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
|
||||||
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
|
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
|
||||||
|
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()
|
||||||
|
|
||||||
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
|
ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
|
||||||
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
|
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
|
||||||
@ -131,6 +132,7 @@ var (
|
|||||||
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
|
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
|
||||||
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
|
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
|
||||||
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
|
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
|
||||||
|
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,
|
||||||
|
|
||||||
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
||||||
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
|
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
|
||||||
@ -195,6 +197,7 @@ var (
|
|||||||
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
|
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
|
||||||
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
|
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
|
||||||
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
|
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
|
||||||
|
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
|
||||||
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
|
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
|
||||||
|
|
||||||
ErrNoLeader = Error(ErrGRPCNoLeader)
|
ErrNoLeader = Error(ErrGRPCNoLeader)
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.5.1"
|
Version = "3.5.2"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
@ -5,8 +5,8 @@ go 1.16
|
|||||||
require (
|
require (
|
||||||
github.com/json-iterator/go v1.1.11
|
github.com/json-iterator/go v1.1.11
|
||||||
github.com/modern-go/reflect2 v1.0.1
|
github.com/modern-go/reflect2 v1.0.1
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
|
@ -154,8 +154,9 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
|
|||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
|
||||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
@ -6,8 +6,8 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.0
|
github.com/dustin/go-humanize v1.0.0
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/prometheus/client_golang v1.11.0
|
github.com/prometheus/client_golang v1.11.0
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
sigs.k8s.io/yaml v1.2.0
|
sigs.k8s.io/yaml v1.2.0
|
||||||
|
@ -258,8 +258,9 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
|
||||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
@ -73,7 +73,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien
|
|||||||
// its the callCtx deadline or cancellation, in which case try again.
|
// its the callCtx deadline or cancellation, in which case try again.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
|
if c.shouldRefreshToken(lastErr, callOpts) {
|
||||||
// clear auth token before refreshing it.
|
// clear auth token before refreshing it.
|
||||||
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
|
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
|
||||||
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
|
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
|
||||||
@ -148,6 +148,19 @@ func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamCli
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions,
|
||||||
|
// and returns a boolean value.
|
||||||
|
func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
|
||||||
|
if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
|
||||||
|
// refresh the token when username, password is present but the server returns ErrUserEmpty
|
||||||
|
// which is possible when the client token is cleared somehow
|
||||||
|
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return callOpts.retryAuth &&
|
||||||
|
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
|
||||||
|
}
|
||||||
|
|
||||||
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
|
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
|
||||||
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
|
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
|
||||||
// a new ClientStream according to the retry policy.
|
// a new ClientStream according to the retry policy.
|
||||||
@ -245,7 +258,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
|
|||||||
// its the callCtx deadline or cancellation, in which case try again.
|
// its the callCtx deadline or cancellation, in which case try again.
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
|
if s.client.shouldRefreshToken(err, s.callOpts) {
|
||||||
// clear auth token to avoid failure when call getToken
|
// clear auth token to avoid failure when call getToken
|
||||||
s.client.authTokenBundle.UpdateAuthToken("")
|
s.client.authTokenBundle.UpdateAuthToken("")
|
||||||
|
|
||||||
|
124
client/v3/retry_interceptor_test.go
Normal file
124
client/v3/retry_interceptor_test.go
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
package clientv3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||||
|
"go.etcd.io/etcd/client/v3/credentials"
|
||||||
|
grpccredentials "google.golang.org/grpc/credentials"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dummyAuthTokenBundle struct{}
|
||||||
|
|
||||||
|
func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientShouldRefreshToken(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
authTokenBundle credentials.Bundle
|
||||||
|
}
|
||||||
|
type args struct {
|
||||||
|
err error
|
||||||
|
callOpts *options
|
||||||
|
}
|
||||||
|
|
||||||
|
optsWithTrue := &options{
|
||||||
|
retryAuth: true,
|
||||||
|
}
|
||||||
|
optsWithFalse := &options{
|
||||||
|
retryAuth: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
args args
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ErrUserEmpty and non nil authTokenBundle",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: &dummyAuthTokenBundle{},
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ErrUserEmpty and nil authTokenBundle",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ErrGRPCInvalidAuthToken and retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ErrGRPCInvalidAuthToken and !retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ErrGRPCAuthOldRevision and retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ErrGRPCAuthOldRevision and !retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Other error and retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Other error and !retryAuth",
|
||||||
|
fields: fields{
|
||||||
|
authTokenBundle: nil,
|
||||||
|
},
|
||||||
|
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
c := &Client{
|
||||||
|
authTokenBundle: tt.fields.authTokenBundle,
|
||||||
|
}
|
||||||
|
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
|
||||||
|
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -9,12 +9,12 @@ require (
|
|||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/urfave/cli v1.22.4
|
github.com/urfave/cli v1.22.4
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/v2 v2.305.1
|
go.etcd.io/etcd/client/v2 v2.305.2
|
||||||
go.etcd.io/etcd/client/v3 v3.5.1
|
go.etcd.io/etcd/client/v3 v3.5.2
|
||||||
go.etcd.io/etcd/etcdutl/v3 v3.5.1
|
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.1
|
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
|
@ -25,11 +25,11 @@ require (
|
|||||||
github.com/olekukonko/tablewriter v0.0.5
|
github.com/olekukonko/tablewriter v0.0.5
|
||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/v3 v3.5.1
|
go.etcd.io/etcd/client/v3 v3.5.2
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.1
|
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.1
|
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||||
go.etcd.io/etcd/server/v3 v3.5.1
|
go.etcd.io/etcd/server/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
)
|
)
|
||||||
|
20
go.mod
20
go.mod
@ -20,16 +20,16 @@ require (
|
|||||||
github.com/dustin/go-humanize v1.0.0
|
github.com/dustin/go-humanize v1.0.0
|
||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/v2 v2.305.1
|
go.etcd.io/etcd/client/v2 v2.305.2
|
||||||
go.etcd.io/etcd/client/v3 v3.5.1
|
go.etcd.io/etcd/client/v3 v3.5.2
|
||||||
go.etcd.io/etcd/etcdctl/v3 v3.5.1
|
go.etcd.io/etcd/etcdctl/v3 v3.5.2
|
||||||
go.etcd.io/etcd/etcdutl/v3 v3.5.1
|
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.1
|
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.1
|
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||||
go.etcd.io/etcd/server/v3 v3.5.1
|
go.etcd.io/etcd/server/v3 v3.5.2
|
||||||
go.etcd.io/etcd/tests/v3 v3.5.1
|
go.etcd.io/etcd/tests/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
|
@ -9,7 +9,7 @@ require (
|
|||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
)
|
)
|
||||||
|
@ -8,7 +8,7 @@ require (
|
|||||||
github.com/gogo/protobuf v1.3.2
|
github.com/gogo/protobuf v1.3.2
|
||||||
github.com/golang/protobuf v1.5.2
|
github.com/golang/protobuf v1.5.2
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bad imports are sometimes causing attempts to pull that code.
|
// Bad imports are sometimes causing attempts to pull that code.
|
||||||
|
@ -147,10 +147,12 @@ type ServerConfig struct {
|
|||||||
|
|
||||||
ForceNewCluster bool
|
ForceNewCluster bool
|
||||||
|
|
||||||
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||||
EnableLeaseCheckpoint bool
|
EnableLeaseCheckpoint bool
|
||||||
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
|
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
|
||||||
LeaseCheckpointInterval time.Duration
|
LeaseCheckpointInterval time.Duration
|
||||||
|
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||||
|
LeaseCheckpointPersist bool
|
||||||
|
|
||||||
EnableGRPCGateway bool
|
EnableGRPCGateway bool
|
||||||
|
|
||||||
|
@ -314,10 +314,15 @@ type Config struct {
|
|||||||
// Deprecated in v3.5.
|
// Deprecated in v3.5.
|
||||||
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
||||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
// Requires experimental-enable-lease-checkpoint to be enabled.
|
||||||
|
// Deprecated in v3.6.
|
||||||
|
// TODO: Delete in v3.7
|
||||||
|
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||||
|
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||||
|
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||||
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
||||||
// takes more time than this value.
|
// takes more time than this value.
|
||||||
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
|
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
|
||||||
@ -678,6 +683,14 @@ func (cfg *Config) Validate() error {
|
|||||||
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
|
||||||
|
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
|
||||||
|
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,6 +291,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeaseCheckpointValidate(t *testing.T) {
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
configFunc func() Config
|
||||||
|
expectError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Default config should pass",
|
||||||
|
configFunc: func() Config {
|
||||||
|
return *NewConfig()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Enabling checkpoint leases should pass",
|
||||||
|
configFunc: func() Config {
|
||||||
|
cfg := *NewConfig()
|
||||||
|
cfg.ExperimentalEnableLeaseCheckpoint = true
|
||||||
|
return cfg
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Enabling checkpoint leases and persist should pass",
|
||||||
|
configFunc: func() Config {
|
||||||
|
cfg := *NewConfig()
|
||||||
|
cfg.ExperimentalEnableLeaseCheckpoint = true
|
||||||
|
cfg.ExperimentalEnableLeaseCheckpointPersist = true
|
||||||
|
return cfg
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
|
||||||
|
configFunc: func() Config {
|
||||||
|
cfg := *NewConfig()
|
||||||
|
cfg.ExperimentalEnableLeaseCheckpointPersist = true
|
||||||
|
return cfg
|
||||||
|
},
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
cfg := tc.configFunc()
|
||||||
|
err := cfg.Validate()
|
||||||
|
if (err != nil) != tc.expectError {
|
||||||
|
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLogRotation(t *testing.T) {
|
func TestLogRotation(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -216,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
|
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
|
||||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
|
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||||
|
@ -280,7 +280,9 @@ func newConfig() *config {
|
|||||||
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
|
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
|
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
|
||||||
|
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
|
||||||
|
// TODO: delete in v3.7
|
||||||
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
||||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||||
|
@ -138,8 +138,7 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
|
|||||||
for _, v := range as {
|
for _, v := range as {
|
||||||
alarmName := v.Alarm.String()
|
alarmName := v.Alarm.String()
|
||||||
if _, found := excludedAlarms[alarmName]; found {
|
if _, found := excludedAlarms[alarmName]; found {
|
||||||
lg.Debug("/health excluded alarm", zap.String("alarm", alarmName))
|
lg.Debug("/health excluded alarm", zap.String("alarm", v.String()))
|
||||||
delete(excludedAlarms, alarmName)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,10 +156,6 @@ func checkHealth(lg *zap.Logger, srv etcdserver.ServerV2, excludedAlarms AlarmSe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(excludedAlarms) > 0 {
|
|
||||||
lg.Warn("fail exclude alarms from health check", zap.String("exclude alarms", fmt.Sprintf("%+v", excludedAlarms)))
|
|
||||||
}
|
|
||||||
|
|
||||||
if uint64(srv.Leader()) == raft.None {
|
if uint64(srv.Leader()) == raft.None {
|
||||||
h.Health = "false"
|
h.Health = "false"
|
||||||
h.Reason = "RAFT NO LEADER"
|
h.Reason = "RAFT NO LEADER"
|
||||||
|
@ -78,6 +78,12 @@ func TestHealthHandler(t *testing.T) {
|
|||||||
http.StatusOK,
|
http.StatusOK,
|
||||||
"true",
|
"true",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
[]*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}},
|
||||||
|
"/health?exclude=NOSPACE",
|
||||||
|
http.StatusOK,
|
||||||
|
"true",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
[]*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
|
[]*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}},
|
||||||
"/health?exclude=NOSPACE",
|
"/health?exclude=NOSPACE",
|
||||||
|
@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{
|
|||||||
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
|
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
|
||||||
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
|
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
|
||||||
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
|
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
|
||||||
|
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
|
||||||
|
|
||||||
// In sync with status.FromContextError
|
// In sync with status.FromContextError
|
||||||
context.Canceled: rpctypes.ErrGRPCCanceled,
|
context.Canceled: rpctypes.ErrGRPCCanceled,
|
||||||
|
@ -514,6 +514,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
// A snapshot db may have already been recovered, and the old db should have
|
||||||
|
// already been closed in this case, so we should set the backend again.
|
||||||
|
ci.SetBackend(be)
|
||||||
s1, s2 := be.Size(), be.SizeInUse()
|
s1, s2 := be.Size(), be.SizeInUse()
|
||||||
cfg.Logger.Info(
|
cfg.Logger.Info(
|
||||||
"recovered v3 backend from snapshot",
|
"recovered v3 backend from snapshot",
|
||||||
@ -592,9 +595,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
|
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
|
||||||
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
||||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||||
|
CheckpointPersist: cfg.LeaseCheckpointPersist,
|
||||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -25,12 +25,12 @@ require (
|
|||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
|
||||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/v2 v2.305.1
|
go.etcd.io/etcd/client/v2 v2.305.2
|
||||||
go.etcd.io/etcd/client/v3 v3.5.1
|
go.etcd.io/etcd/client/v3 v3.5.2
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.1
|
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.1
|
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
|
||||||
go.opentelemetry.io/otel v0.20.0
|
go.opentelemetry.io/otel v0.20.0
|
||||||
go.opentelemetry.io/otel/exporters/otlp v0.20.0
|
go.opentelemetry.io/otel/exporters/otlp v0.20.0
|
||||||
|
@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
|
|||||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||||
defer betesting.Close(t, be)
|
defer betesting.Close(t, be)
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
|||||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||||
defer betesting.Close(t, be)
|
defer betesting.Close(t, be)
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
|
|||||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||||
defer betesting.Close(t, be)
|
defer betesting.Close(t, be)
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
|
|||||||
// MaxLeaseTTL is the maximum lease TTL value
|
// MaxLeaseTTL is the maximum lease TTL value
|
||||||
const MaxLeaseTTL = 9000000000
|
const MaxLeaseTTL = 9000000000
|
||||||
|
|
||||||
|
var v3_6 = semver.Version{Major: 3, Minor: 6}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
forever = time.Time{}
|
forever = time.Time{}
|
||||||
|
|
||||||
@ -180,19 +183,29 @@ type lessor struct {
|
|||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
// the interval to check if the expired lease is revoked
|
// the interval to check if the expired lease is revoked
|
||||||
expiredLeaseRetryInterval time.Duration
|
expiredLeaseRetryInterval time.Duration
|
||||||
|
// whether lessor should always persist remaining TTL (always enabled in v3.6).
|
||||||
|
checkpointPersist bool
|
||||||
|
// cluster is used to adapt lessor logic based on cluster version
|
||||||
|
cluster cluster
|
||||||
|
}
|
||||||
|
|
||||||
|
type cluster interface {
|
||||||
|
// Version is the cluster-wide minimum major.minor version.
|
||||||
|
Version() *semver.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
type LessorConfig struct {
|
type LessorConfig struct {
|
||||||
MinLeaseTTL int64
|
MinLeaseTTL int64
|
||||||
CheckpointInterval time.Duration
|
CheckpointInterval time.Duration
|
||||||
ExpiredLeasesRetryInterval time.Duration
|
ExpiredLeasesRetryInterval time.Duration
|
||||||
|
CheckpointPersist bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
|
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
|
||||||
return newLessor(lg, b, cfg)
|
return newLessor(lg, b, cluster, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
|
||||||
checkpointInterval := cfg.CheckpointInterval
|
checkpointInterval := cfg.CheckpointInterval
|
||||||
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
||||||
if checkpointInterval == 0 {
|
if checkpointInterval == 0 {
|
||||||
@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
|||||||
minLeaseTTL: cfg.MinLeaseTTL,
|
minLeaseTTL: cfg.MinLeaseTTL,
|
||||||
checkpointInterval: checkpointInterval,
|
checkpointInterval: checkpointInterval,
|
||||||
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
|
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
|
||||||
|
checkpointPersist: cfg.CheckpointPersist,
|
||||||
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
||||||
expiredC: make(chan []*Lease, 16),
|
expiredC: make(chan []*Lease, 16),
|
||||||
stopC: make(chan struct{}),
|
stopC: make(chan struct{}),
|
||||||
doneC: make(chan struct{}),
|
doneC: make(chan struct{}),
|
||||||
lg: lg,
|
lg: lg,
|
||||||
|
cluster: cluster,
|
||||||
}
|
}
|
||||||
l.initAndRecover()
|
l.initAndRecover()
|
||||||
|
|
||||||
@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
|||||||
if l, ok := le.leaseMap[id]; ok {
|
if l, ok := le.leaseMap[id]; ok {
|
||||||
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
|
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
|
||||||
l.remainingTTL = remainingTTL
|
l.remainingTTL = remainingTTL
|
||||||
|
if le.shouldPersistCheckpoints() {
|
||||||
|
l.persistTo(le.b)
|
||||||
|
}
|
||||||
if le.isPrimary() {
|
if le.isPrimary() {
|
||||||
// schedule the next checkpoint as needed
|
// schedule the next checkpoint as needed
|
||||||
le.scheduleCheckpointIfNeeded(l)
|
le.scheduleCheckpointIfNeeded(l)
|
||||||
@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (le *lessor) shouldPersistCheckpoints() bool {
|
||||||
|
cv := le.cluster.Version()
|
||||||
|
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
|
||||||
|
}
|
||||||
|
|
||||||
|
func greaterOrEqual(first, second semver.Version) bool {
|
||||||
|
return !first.LessThan(second)
|
||||||
|
}
|
||||||
|
|
||||||
// Renew renews an existing lease. If the given lease does not exist or
|
// Renew renews an existing lease. If the given lease does not exist or
|
||||||
// has expired, an error will be returned.
|
// has expired, an error will be returned.
|
||||||
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||||
@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
|
|||||||
l.refresh(extend)
|
l.refresh(extend)
|
||||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||||
|
le.scheduleCheckpointIfNeeded(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(le.leaseMap) < leaseRevokeRate {
|
if len(le.leaseMap) < leaseRevokeRate {
|
||||||
@ -789,9 +817,10 @@ func (le *lessor) initAndRecover() {
|
|||||||
ttl: lpb.TTL,
|
ttl: lpb.TTL,
|
||||||
// itemSet will be filled in when recover key-value pairs
|
// itemSet will be filled in when recover key-value pairs
|
||||||
// set expiry to forever, refresh when promoted
|
// set expiry to forever, refresh when promoted
|
||||||
itemSet: make(map[LeaseItem]struct{}),
|
itemSet: make(map[LeaseItem]struct{}),
|
||||||
expiry: forever,
|
expiry: forever,
|
||||||
revokec: make(chan struct{}),
|
revokec: make(chan struct{}),
|
||||||
|
remainingTTL: lpb.RemainingTTL,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
le.leaseExpiredNotifier.Init()
|
le.leaseExpiredNotifier.Init()
|
||||||
|
@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
|
|||||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
||||||
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
ftd := &FakeTxnDelete{be.BatchTx()}
|
ftd := &FakeTxnDelete{be.BatchTx()}
|
||||||
ftd.Lock()
|
ftd.Lock()
|
||||||
|
@ -26,7 +26,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
|
"go.etcd.io/etcd/api/v3/version"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -46,7 +48,7 @@ func TestLessorGrant(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
var fd *fakeDeleter
|
var fd *fakeDeleter
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
@ -210,7 +212,7 @@ func TestLessorRenew(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -243,7 +245,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||||
for _, cp := range cp.GetCheckpoints() {
|
for _, cp := range cp.GetCheckpoints() {
|
||||||
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||||
@ -292,7 +294,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
dir, be := NewTestBackend(t)
|
dir, be := NewTestBackend(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
ttl := int64(10)
|
ttl := int64(10)
|
||||||
for i := 1; i <= leaseRevokeRate*10; i++ {
|
for i := 1; i <= leaseRevokeRate*10; i++ {
|
||||||
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
||||||
@ -311,7 +313,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
bcfg.Path = filepath.Join(dir, "be")
|
bcfg.Path = filepath.Join(dir, "be")
|
||||||
be = backend.New(bcfg)
|
be = backend.New(bcfg)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
// extend after recovery should extend expiration on lease pile-up
|
// extend after recovery should extend expiration on lease pile-up
|
||||||
@ -341,7 +343,7 @@ func TestLessorDetach(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -382,7 +384,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l1, err1 := le.Grant(1, 10)
|
l1, err1 := le.Grant(1, 10)
|
||||||
l2, err2 := le.Grant(2, 20)
|
l2, err2 := le.Grant(2, 20)
|
||||||
@ -391,7 +393,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new lessor with the same backend
|
// Create a new lessor with the same backend
|
||||||
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer nle.Stop()
|
defer nle.Stop()
|
||||||
nl1 := nle.Lookup(l1.ID)
|
nl1 := nle.Lookup(l1.ID)
|
||||||
if nl1 == nil || nl1.ttl != l1.ttl {
|
if nl1 == nil || nl1.ttl != l1.ttl {
|
||||||
@ -412,7 +414,7 @@ func TestLessorExpire(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -465,7 +467,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -514,7 +516,7 @@ func TestLessorMaxTTL(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
_, err := le.Grant(1, MaxLeaseTTL+1)
|
_, err := le.Grant(1, MaxLeaseTTL+1)
|
||||||
@ -530,7 +532,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
||||||
|
defer le.Stop()
|
||||||
le.minLeaseTTL = 1
|
le.minLeaseTTL = 1
|
||||||
checkpointedC := make(chan struct{})
|
checkpointedC := make(chan struct{})
|
||||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
||||||
@ -543,13 +546,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
|||||||
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
|
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
defer le.Stop()
|
|
||||||
le.Promote(0)
|
|
||||||
|
|
||||||
_, err := le.Grant(1, 2)
|
_, err := le.Grant(1, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
le.Promote(0)
|
||||||
|
|
||||||
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
|
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
|
||||||
select {
|
select {
|
||||||
@ -565,7 +566,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l, err := le.Grant(1, 10)
|
l, err := le.Grant(1, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -579,6 +580,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
|
||||||
|
const ttl int64 = 10
|
||||||
|
const checkpointTTL int64 = 5
|
||||||
|
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
cluster cluster
|
||||||
|
checkpointPersist bool
|
||||||
|
expectRemainingTTL int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
|
||||||
|
cluster: clusterV3_6(),
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
|
||||||
|
cluster: clusterLatest(),
|
||||||
|
checkpointPersist: true,
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
|
||||||
|
cluster: clusterNil(),
|
||||||
|
checkpointPersist: true,
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
|
||||||
|
cluster: clusterLatest(),
|
||||||
|
expectRemainingTTL: ttl,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd with version unknown fallbacks to v3.5 behavior",
|
||||||
|
cluster: clusterNil(),
|
||||||
|
expectRemainingTTL: ttl,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
lg := zap.NewNop()
|
||||||
|
dir, be := NewTestBackend(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer be.Close()
|
||||||
|
|
||||||
|
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
|
||||||
|
cfg.CheckpointPersist = tc.checkpointPersist
|
||||||
|
le := newLessor(lg, be, tc.cluster, cfg)
|
||||||
|
l, err := le.Grant(2, ttl)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if l.RemainingTTL() != ttl {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
|
||||||
|
}
|
||||||
|
le.Checkpoint(2, checkpointTTL)
|
||||||
|
if l.RemainingTTL() != checkpointTTL {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
|
||||||
|
}
|
||||||
|
le.Stop()
|
||||||
|
le2 := newLessor(lg, be, clusterV3_6(), cfg)
|
||||||
|
l = le2.Lookup(2)
|
||||||
|
if l.RemainingTTL() != tc.expectRemainingTTL {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type fakeDeleter struct {
|
type fakeDeleter struct {
|
||||||
deleted []string
|
deleted []string
|
||||||
tx backend.BatchTx
|
tx backend.BatchTx
|
||||||
@ -606,3 +676,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
|||||||
bcfg.Path = filepath.Join(tmpPath, "be")
|
bcfg.Path = filepath.Join(tmpPath, "be")
|
||||||
return tmpPath, backend.New(bcfg)
|
return tmpPath, backend.New(bcfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clusterV3_6() cluster {
|
||||||
|
return fakeCluster{semver.New("3.6.0")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterLatest() cluster {
|
||||||
|
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterNil() cluster {
|
||||||
|
return fakeCluster{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCluster struct {
|
||||||
|
version *semver.Version
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c fakeCluster) Version() *semver.Version {
|
||||||
|
return c.version
|
||||||
|
}
|
||||||
|
@ -432,6 +432,8 @@ func (b *backend) Defrag() error {
|
|||||||
|
|
||||||
func (b *backend) defrag() error {
|
func (b *backend) defrag() error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
isDefragActive.Set(1)
|
||||||
|
defer isDefragActive.Set(0)
|
||||||
|
|
||||||
// TODO: make this non-blocking?
|
// TODO: make this non-blocking?
|
||||||
// lock batchTx to ensure nobody is using previous tx, and then
|
// lock batchTx to ensure nobody is using previous tx, and then
|
||||||
|
@ -83,6 +83,13 @@ var (
|
|||||||
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
|
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
|
||||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "disk",
|
||||||
|
Name: "defrag_inflight",
|
||||||
|
Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -92,4 +99,5 @@ func init() {
|
|||||||
prometheus.MustRegister(writeSec)
|
prometheus.MustRegister(writeSec)
|
||||||
prometheus.MustRegister(defragSec)
|
prometheus.MustRegister(defragSec)
|
||||||
prometheus.MustRegister(snapshotTransferSec)
|
prometheus.MustRegister(snapshotTransferSec)
|
||||||
|
prometheus.MustRegister(isDefragActive)
|
||||||
}
|
}
|
||||||
|
@ -355,8 +355,11 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
tx := s.store.b.ReadTx()
|
tx := s.store.b.ReadTx()
|
||||||
tx.RLock()
|
tx.RLock()
|
||||||
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
|
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
|
||||||
tx.RUnlock()
|
|
||||||
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
||||||
|
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
|
||||||
|
// We can only unlock after Unmarshal, which will do deep copy.
|
||||||
|
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
|
||||||
|
tx.RUnlock()
|
||||||
|
|
||||||
var victims watcherBatch
|
var victims watcherBatch
|
||||||
wb := newWatcherBatch(wg, evs)
|
wb := newWatcherBatch(wg, evs)
|
||||||
|
16
tests/go.mod
16
tests/go.mod
@ -28,14 +28,14 @@ require (
|
|||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.etcd.io/etcd/api/v3 v3.5.1
|
go.etcd.io/etcd/api/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.1
|
go.etcd.io/etcd/client/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/client/v2 v2.305.1
|
go.etcd.io/etcd/client/v2 v2.305.2
|
||||||
go.etcd.io/etcd/client/v3 v3.5.1
|
go.etcd.io/etcd/client/v3 v3.5.2
|
||||||
go.etcd.io/etcd/etcdutl/v3 v3.5.1
|
go.etcd.io/etcd/etcdutl/v3 v3.5.2
|
||||||
go.etcd.io/etcd/pkg/v3 v3.5.1
|
go.etcd.io/etcd/pkg/v3 v3.5.2
|
||||||
go.etcd.io/etcd/raft/v3 v3.5.1
|
go.etcd.io/etcd/raft/v3 v3.5.2
|
||||||
go.etcd.io/etcd/server/v3 v3.5.1
|
go.etcd.io/etcd/server/v3 v3.5.2
|
||||||
go.uber.org/zap v1.17.0
|
go.uber.org/zap v1.17.0
|
||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||||
|
@ -167,6 +167,7 @@ type ClusterConfig struct {
|
|||||||
|
|
||||||
EnableLeaseCheckpoint bool
|
EnableLeaseCheckpoint bool
|
||||||
LeaseCheckpointInterval time.Duration
|
LeaseCheckpointInterval time.Duration
|
||||||
|
LeaseCheckpointPersist bool
|
||||||
|
|
||||||
WatchProgressNotifyInterval time.Duration
|
WatchProgressNotifyInterval time.Duration
|
||||||
}
|
}
|
||||||
@ -328,6 +329,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
|
|||||||
useBridge: c.cfg.UseBridge,
|
useBridge: c.cfg.UseBridge,
|
||||||
useTCP: c.cfg.UseTCP,
|
useTCP: c.cfg.UseTCP,
|
||||||
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
||||||
|
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
|
||||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||||
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
||||||
})
|
})
|
||||||
@ -631,6 +633,7 @@ type memberConfig struct {
|
|||||||
useTCP bool
|
useTCP bool
|
||||||
enableLeaseCheckpoint bool
|
enableLeaseCheckpoint bool
|
||||||
leaseCheckpointInterval time.Duration
|
leaseCheckpointInterval time.Duration
|
||||||
|
leaseCheckpointPersist bool
|
||||||
WatchProgressNotifyInterval time.Duration
|
WatchProgressNotifyInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -729,6 +732,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
|||||||
m.useTCP = mcfg.useTCP
|
m.useTCP = mcfg.useTCP
|
||||||
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
||||||
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
||||||
|
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
|
||||||
|
|
||||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||||
|
|
||||||
|
@ -229,56 +229,121 @@ func TestV3LeaseKeepAlive(t *testing.T) {
|
|||||||
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
|
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
|
||||||
// across leader elections.
|
// across leader elections.
|
||||||
func TestV3LeaseCheckpoint(t *testing.T) {
|
func TestV3LeaseCheckpoint(t *testing.T) {
|
||||||
BeforeTest(t)
|
tcs := []struct {
|
||||||
|
name string
|
||||||
var ttl int64 = 300
|
checkpointingEnabled bool
|
||||||
leaseInterval := 2 * time.Second
|
ttl time.Duration
|
||||||
clus := NewClusterV3(t, &ClusterConfig{
|
checkpointingInterval time.Duration
|
||||||
Size: 3,
|
checkpointingPersist bool
|
||||||
EnableLeaseCheckpoint: true,
|
leaderChanges int
|
||||||
LeaseCheckpointInterval: leaseInterval,
|
clusterSize int
|
||||||
UseBridge: true,
|
expectTTLIsGT time.Duration
|
||||||
})
|
expectTTLIsLT time.Duration
|
||||||
defer clus.Terminate(t)
|
}{
|
||||||
|
{
|
||||||
// create lease
|
name: "Checkpointing disabled, lease TTL is reset",
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ttl: 300 * time.Second,
|
||||||
defer cancel()
|
leaderChanges: 1,
|
||||||
c := toGRPC(clus.RandClient())
|
clusterSize: 3,
|
||||||
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
|
expectTTLIsGT: 298 * time.Second,
|
||||||
if err != nil {
|
},
|
||||||
t.Fatal(err)
|
{
|
||||||
|
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
|
||||||
|
ttl: 300 * time.Second,
|
||||||
|
checkpointingEnabled: true,
|
||||||
|
checkpointingInterval: 10 * time.Second,
|
||||||
|
leaderChanges: 1,
|
||||||
|
clusterSize: 3,
|
||||||
|
expectTTLIsLT: 290 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
|
||||||
|
ttl: 300 * time.Second,
|
||||||
|
checkpointingEnabled: true,
|
||||||
|
checkpointingInterval: 10 * time.Second,
|
||||||
|
checkpointingPersist: true,
|
||||||
|
leaderChanges: 1,
|
||||||
|
clusterSize: 1,
|
||||||
|
expectTTLIsLT: 290 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
|
||||||
|
ttl: 300 * time.Second,
|
||||||
|
checkpointingEnabled: true,
|
||||||
|
checkpointingInterval: 10 * time.Second,
|
||||||
|
leaderChanges: 1,
|
||||||
|
clusterSize: 1,
|
||||||
|
expectTTLIsGT: 298 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Checking if checkpointing continues after the first leader change.
|
||||||
|
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
|
||||||
|
ttl: 300 * time.Second,
|
||||||
|
checkpointingEnabled: true,
|
||||||
|
checkpointingInterval: 10 * time.Second,
|
||||||
|
leaderChanges: 2,
|
||||||
|
clusterSize: 3,
|
||||||
|
expectTTLIsLT: 280 * time.Second,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
BeforeTest(t)
|
||||||
|
config := &ClusterConfig{
|
||||||
|
Size: tc.clusterSize,
|
||||||
|
EnableLeaseCheckpoint: tc.checkpointingEnabled,
|
||||||
|
LeaseCheckpointInterval: tc.checkpointingInterval,
|
||||||
|
LeaseCheckpointPersist: tc.checkpointingPersist,
|
||||||
|
}
|
||||||
|
clus := NewClusterV3(t, config)
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
// wait for a checkpoint to occur
|
// create lease
|
||||||
time.Sleep(leaseInterval + 1*time.Second)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
// Force a leader election
|
c := toGRPC(clus.RandClient())
|
||||||
leaderId := clus.WaitLeader(t)
|
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
|
||||||
leader := clus.Members[leaderId]
|
if err != nil {
|
||||||
leader.Stop(t)
|
|
||||||
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
|
|
||||||
leader.Restart(t)
|
|
||||||
newLeaderId := clus.WaitLeader(t)
|
|
||||||
c2 := toGRPC(clus.Client(newLeaderId))
|
|
||||||
|
|
||||||
time.Sleep(250 * time.Millisecond)
|
|
||||||
|
|
||||||
// Check the TTL of the new leader
|
|
||||||
var ttlresp *pb.LeaseTimeToLiveResponse
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
|
||||||
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
|
||||||
time.Sleep(time.Millisecond * 250)
|
|
||||||
} else {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedTTL := ttl - int64(leaseInterval.Seconds())
|
for i := 0; i < tc.leaderChanges; i++ {
|
||||||
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
|
// wait for a checkpoint to occur
|
||||||
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
|
time.Sleep(tc.checkpointingInterval + 1*time.Second)
|
||||||
|
|
||||||
|
// Force a leader election
|
||||||
|
leaderId := clus.WaitLeader(t)
|
||||||
|
leader := clus.Members[leaderId]
|
||||||
|
leader.Stop(t)
|
||||||
|
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
|
||||||
|
leader.Restart(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
newLeaderId := clus.WaitLeader(t)
|
||||||
|
c2 := toGRPC(clus.Client(newLeaderId))
|
||||||
|
|
||||||
|
time.Sleep(250 * time.Millisecond)
|
||||||
|
|
||||||
|
// Check the TTL of the new leader
|
||||||
|
var ttlresp *pb.LeaseTimeToLiveResponse
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
||||||
|
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
||||||
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
} else {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
|
||||||
|
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
|
||||||
|
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user