Move etcdserver/errors.go to sepatate package to avoid cyclic dependencies.

This commit is contained in:
Piotr Tabor
2022-05-13 13:34:23 +02:00
parent e2ae9b1d13
commit fc6a6c3c27
17 changed files with 125 additions and 121 deletions

View File

@ -25,9 +25,8 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/osutil" "go.etcd.io/etcd/pkg/v3/osutil"
"go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -127,7 +126,7 @@ func startEtcdOrProxyV2(args []string) {
} }
if err != nil { if err != nil {
if derr, ok := err.(*etcdserver.DiscoveryError); ok { if derr, ok := err.(*etcderrors.DiscoveryError); ok {
switch derr.Err { switch derr.Err {
case v2discovery.ErrDuplicateID: case v2discovery.ErrDuplicateID:
lg.Warn( lg.Warn(

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -142,7 +143,7 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
case membership.ErrMemberNotLearner: case membership.ErrMemberNotLearner:
http.Error(w, err.Error(), http.StatusPreconditionFailed) http.Error(w, err.Error(), http.StatusPreconditionFailed)
case etcdserver.ErrLearnerNotReady: case etcderrors.ErrLearnerNotReady:
http.Error(w, err.Error(), http.StatusPreconditionFailed) http.Error(w, err.Error(), http.StatusPreconditionFailed)
default: default:
writeError(h.lg, w, r, err) writeError(h.lg, w, r, err)

View File

@ -17,7 +17,7 @@ package etcdhttp
import ( import (
"net/http" "net/http"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types" httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error" "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.uber.org/zap" "go.uber.org/zap"
@ -57,8 +57,8 @@ func writeError(lg *zap.Logger, w http.ResponseWriter, r *http.Request, err erro
default: default:
switch err { switch err {
case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers, case etcderrors.ErrTimeoutDueToLeaderFail, etcderrors.ErrTimeoutDueToConnectionLost, etcderrors.ErrNotEnoughStartedMembers,
etcdserver.ErrUnhealthy: etcderrors.ErrUnhealthy:
if lg != nil { if lg != nil {
lg.Warn( lg.Warn(
"v2 response error", "v2 response error",

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/mvcc"
@ -241,7 +242,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
resp.StorageVersion = storageVersion.String() resp.StorageVersion = storageVersion.String()
} }
if resp.Leader == raft.None { if resp.Leader == raft.None {
resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error()) resp.Errors = append(resp.Errors, etcderrors.ErrNoLeader.Error())
} }
for _, a := range ms.a.Alarms() { for _, a := range ms.a.Alarms() {
resp.Errors = append(resp.Errors, a.String()) resp.Errors = append(resp.Errors, a.String())

View File

@ -21,8 +21,8 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/mvcc"
@ -38,30 +38,30 @@ var toGRPCErrorMap = map[error]error{
membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist, membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist,
membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner, membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner,
membership.ErrTooManyLearners: rpctypes.ErrGRPCTooManyLearners, membership.ErrTooManyLearners: rpctypes.ErrGRPCTooManyLearners,
etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted, etcderrors.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
etcdserver.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady, etcderrors.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady,
mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted, mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted,
mvcc.ErrFutureRev: rpctypes.ErrGRPCFutureRev, mvcc.ErrFutureRev: rpctypes.ErrGRPCFutureRev,
etcdserver.ErrRequestTooLarge: rpctypes.ErrGRPCRequestTooLarge, etcderrors.ErrRequestTooLarge: rpctypes.ErrGRPCRequestTooLarge,
etcdserver.ErrNoSpace: rpctypes.ErrGRPCNoSpace, etcderrors.ErrNoSpace: rpctypes.ErrGRPCNoSpace,
etcdserver.ErrTooManyRequests: rpctypes.ErrTooManyRequests, etcderrors.ErrTooManyRequests: rpctypes.ErrTooManyRequests,
etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader, etcderrors.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader, etcderrors.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged, etcderrors.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped, etcderrors.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcderrors.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, etcderrors.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost, etcderrors.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex, etcderrors.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcderrors.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcderrors.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, etcderrors.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee, etcderrors.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, etcderrors.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, etcderrors.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
mvcc_txn "go.etcd.io/etcd/server/v3/etcdserver/txn" mvcc_txn "go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
@ -242,18 +243,18 @@ type applierV3Capped struct {
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrNoSpace return nil, nil, etcderrors.ErrNoSpace
} }
func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if a.q.Cost(r) > 0 { if a.q.Cost(r) > 0 {
return nil, nil, ErrNoSpace return nil, nil, etcderrors.ErrNoSpace
} }
return a.applierV3.Txn(ctx, r) return a.applierV3.Txn(ctx, r)
} }
func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
return nil, ErrNoSpace return nil, etcderrors.ErrNoSpace
} }
func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) { func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
@ -437,7 +438,7 @@ func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRe
ok := a.q.Available(p) ok := a.q.Available(p)
resp, trace, err := a.applierV3.Put(ctx, txn, p) resp, trace, err := a.applierV3.Put(ctx, txn, p)
if err == nil && !ok { if err == nil && !ok {
err = ErrNoSpace err = etcderrors.ErrNoSpace
} }
return resp, trace, err return resp, trace, err
} }
@ -446,7 +447,7 @@ func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnRes
ok := a.q.Available(rt) ok := a.q.Available(rt)
resp, trace, err := a.applierV3.Txn(ctx, rt) resp, trace, err := a.applierV3.Txn(ctx, rt)
if err == nil && !ok { if err == nil && !ok {
err = ErrNoSpace err = etcderrors.ErrNoSpace
} }
return resp, trace, err return resp, trace, err
} }
@ -455,7 +456,7 @@ func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantRes
ok := a.q.Available(lc) ok := a.q.Available(lc)
resp, err := a.applierV3.LeaseGrant(lc) resp, err := a.applierV3.LeaseGrant(lc)
if err == nil && !ok { if err == nil && !ok {
err = ErrNoSpace err = etcderrors.ErrNoSpace
} }
return resp, err return resp, err
} }

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -146,7 +147,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.Shoul
return s.applyV2.Sync(r) return s.applyV2.Sync(r)
default: default:
// This should never be reached, but just in case: // This should never be reached, but just in case:
return Response{Err: ErrUnknownMethod} return Response{Err: etcderrors.ErrUnknownMethod}
} }
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.uber.org/zap" "go.uber.org/zap"
"go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/etcdserverpb"
@ -337,7 +338,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*
str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String()) str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
} }
if err != nil { if err != nil {
return nil, &DiscoveryError{Op: "join", Err: err} return nil, &etcderrors.DiscoveryError{Op: "join", Err: err}
} }
var urlsmap types.URLsMap var urlsmap types.URLsMap
urlsmap, err = types.NewURLsMap(str) urlsmap, err = types.NewURLsMap(str)

View File

@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"go.uber.org/zap" "go.uber.org/zap"
@ -304,12 +305,12 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
} }
if resp.StatusCode == http.StatusRequestTimeout { if resp.StatusCode == http.StatusRequestTimeout {
return nil, ErrTimeout return nil, etcderrors.ErrTimeout
} }
if resp.StatusCode == http.StatusPreconditionFailed { if resp.StatusCode == http.StatusPreconditionFailed {
// both ErrMemberNotLearner and ErrLearnerNotReady have same http status code // both ErrMemberNotLearner and ErrLearnerNotReady have same http status code
if strings.Contains(string(b), ErrLearnerNotReady.Error()) { if strings.Contains(string(b), etcderrors.ErrLearnerNotReady.Error()) {
return nil, ErrLearnerNotReady return nil, etcderrors.ErrLearnerNotReady
} }
if strings.Contains(string(b), membership.ErrMemberNotLearner.Error()) { if strings.Contains(string(b), membership.ErrMemberNotLearner.Error()) {
return nil, membership.ErrMemberNotLearner return nil, membership.ErrMemberNotLearner
@ -408,7 +409,7 @@ func convertToClusterVersion(v string) (*semver.Version, error) {
// allow input version format Major.Minor // allow input version format Major.Minor
ver, err = semver.NewVersion(v + ".0") ver, err = semver.NewVersion(v + ".0")
if err != nil { if err != nil {
return nil, ErrWrongDowngradeVersionFormat return nil, etcderrors.ErrWrongDowngradeVersionFormat
} }
} }
// cluster version only keeps major.minor, remove patch version // cluster version only keeps major.minor, remove patch version

View File

@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap" "go.uber.org/zap"
@ -310,31 +311,31 @@ type applierV3Corrupt struct {
func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
func (a *applierV3Corrupt) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { func (a *applierV3Corrupt) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrCorrupt return nil, nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, ErrCorrupt return nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
return nil, ErrCorrupt return nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { func (a *applierV3Corrupt) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return nil, nil, ErrCorrupt return nil, nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
return nil, nil, nil, ErrCorrupt return nil, nil, nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
return nil, ErrCorrupt return nil, etcderrors.ErrCorrupt
} }
func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, ErrCorrupt return nil, etcderrors.ErrCorrupt
} }
const PeerHashKVPath = "/members/hashkv" const PeerHashKVPath = "/members/hashkv"

View File

@ -1,4 +1,4 @@
// Copyright 2015 The etcd Authors // Copyright 2022 The etcd Authors
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package etcderrors
import ( import (
"errors" "errors"
"fmt" "fmt"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
) )
var ( var (
@ -43,7 +41,7 @@ var (
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrKeyNotFound = txn.ErrKeyNotFound ErrKeyNotFound = errors.New("etcdserver: key not found")
) )
type DiscoveryError struct { type DiscoveryError struct {

View File

@ -34,6 +34,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/pkg/v3/notify" "go.etcd.io/etcd/pkg/v3/notify"
"go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.uber.org/zap" "go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@ -1152,7 +1153,7 @@ func (s *EtcdServer) isLeader() bool {
// MoveLeader transfers the leader to the given transferee. // MoveLeader transfers the leader to the given transferee.
func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error { func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner { if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
return ErrBadLeaderTransferee return etcderrors.ErrBadLeaderTransferee
} }
now := time.Now() now := time.Now()
@ -1170,7 +1171,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
for s.Lead() != transferee { for s.Lead() != transferee {
select { select {
case <-ctx.Done(): // time out case <-ctx.Done(): // time out
return ErrTimeoutLeaderTransfer return etcderrors.ErrTimeoutLeaderTransfer
case <-time.After(interval): case <-time.After(interval):
} }
} }
@ -1209,7 +1210,7 @@ func (s *EtcdServer) TransferLeadership() error {
transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs()) transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
if !ok { if !ok {
return ErrUnhealthy return etcderrors.ErrUnhealthy
} }
tm := s.Cfg.ReqTimeout() tm := s.Cfg.ReqTimeout()
@ -1328,9 +1329,9 @@ func (s *EtcdServer) mayAddMember(memb membership.Member) error {
"rejecting member add request; not enough healthy members", "rejecting member add request; not enough healthy members",
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
zap.Error(ErrNotEnoughStartedMembers), zap.Error(etcderrors.ErrNotEnoughStartedMembers),
) )
return ErrNotEnoughStartedMembers return etcderrors.ErrNotEnoughStartedMembers
} }
if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), s.cluster.VotingMembers()) { if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), s.cluster.VotingMembers()) {
@ -1338,9 +1339,9 @@ func (s *EtcdServer) mayAddMember(memb membership.Member) error {
"rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
zap.Error(ErrUnhealthy), zap.Error(etcderrors.ErrUnhealthy),
) )
return ErrUnhealthy return etcderrors.ErrUnhealthy
} }
return nil return nil
@ -1373,7 +1374,7 @@ func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membershi
learnerPromoteSucceed.Inc() learnerPromoteSucceed.Inc()
return resp, nil return resp, nil
} }
if err != ErrNotLeader { if err != etcderrors.ErrNotLeader {
learnerPromoteFailed.WithLabelValues(err.Error()).Inc() learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
return resp, err return resp, err
} }
@ -1392,16 +1393,16 @@ func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membershi
return resp, nil return resp, nil
} }
// If member promotion failed, return early. Otherwise keep retry. // If member promotion failed, return early. Otherwise keep retry.
if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner { if err == etcderrors.ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
return nil, err return nil, err
} }
} }
} }
if cctx.Err() == context.DeadlineExceeded { if cctx.Err() == context.DeadlineExceeded {
return nil, ErrTimeout return nil, etcderrors.ErrTimeout
} }
return nil, ErrCanceled return nil, etcderrors.ErrCanceled
} }
// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote // promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
@ -1457,9 +1458,9 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error {
"rejecting member promote request; not enough healthy members", "rejecting member promote request; not enough healthy members",
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.String("requested-member-remove-id", id.String()), zap.String("requested-member-remove-id", id.String()),
zap.Error(ErrNotEnoughStartedMembers), zap.Error(etcderrors.ErrNotEnoughStartedMembers),
) )
return ErrNotEnoughStartedMembers return etcderrors.ErrNotEnoughStartedMembers
} }
return nil return nil
@ -1473,7 +1474,7 @@ func (s *EtcdServer) isLearnerReady(id uint64) error {
// leader's raftStatus.Progress is not nil // leader's raftStatus.Progress is not nil
if rs.Progress == nil { if rs.Progress == nil {
return ErrNotLeader return etcderrors.ErrNotLeader
} }
var learnerMatch uint64 var learnerMatch uint64
@ -1492,7 +1493,7 @@ func (s *EtcdServer) isLearnerReady(id uint64) error {
leaderMatch := rs.Progress[leaderID].Match leaderMatch := rs.Progress[leaderID].Match
// the learner's Match not caught up with leader yet // the learner's Match not caught up with leader yet
if float64(learnerMatch) < float64(leaderMatch)*readyPercent { if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
return ErrLearnerNotReady return etcderrors.ErrLearnerNotReady
} }
} }
@ -1516,9 +1517,9 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
"rejecting member remove request; not enough healthy members", "rejecting member remove request; not enough healthy members",
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.String("requested-member-remove-id", id.String()), zap.String("requested-member-remove-id", id.String()),
zap.Error(ErrNotEnoughStartedMembers), zap.Error(etcderrors.ErrNotEnoughStartedMembers),
) )
return ErrNotEnoughStartedMembers return etcderrors.ErrNotEnoughStartedMembers
} }
// downed member is safe to remove since it's not part of the active quorum // downed member is safe to remove since it's not part of the active quorum
@ -1535,9 +1536,9 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-id", s.MemberId().String()),
zap.String("requested-member-remove", id.String()), zap.String("requested-member-remove", id.String()),
zap.Int("active-peers", active), zap.Int("active-peers", active),
zap.Error(ErrUnhealthy), zap.Error(etcderrors.ErrUnhealthy),
) )
return ErrUnhealthy return etcderrors.ErrUnhealthy
} }
return nil return nil
@ -1663,7 +1664,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
return nil, s.parseProposeCtxErr(ctx.Err(), start) return nil, s.parseProposeCtxErr(ctx.Err(), start)
case <-s.stopping: case <-s.stopping:
return nil, ErrStopped return nil, etcderrors.ErrStopped
} }
} }
@ -1911,7 +1912,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
return return
} }
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { if ar.err != etcderrors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
s.w.Trigger(id, ar) s.w.Trigger(id, ar)
return return
} }
@ -2194,7 +2195,7 @@ func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return return
case ErrStopped: case etcderrors.ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return return
@ -2230,7 +2231,7 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return return
case ErrStopped: case etcderrors.ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return return
@ -2263,7 +2264,7 @@ func (s *EtcdServer) monitorDowngrade() {
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
switch err { switch err {
case context.Canceled: case context.Canceled:
return ErrCanceled return etcderrors.ErrCanceled
case context.DeadlineExceeded: case context.DeadlineExceeded:
s.leadTimeMu.RLock() s.leadTimeMu.RLock()
@ -2271,7 +2272,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
s.leadTimeMu.RUnlock() s.leadTimeMu.RUnlock()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) { if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail return etcderrors.ErrTimeoutDueToLeaderFail
} }
lead := types.ID(s.getLead()) lead := types.ID(s.getLead())
switch lead { switch lead {
@ -2279,14 +2280,14 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
// TODO: return error to specify it happens because the cluster does not have leader now // TODO: return error to specify it happens because the cluster does not have leader now
case s.MemberId(): case s.MemberId():
if !isConnectedToQuorumSince(s.r.transport, start, s.MemberId(), s.cluster.Members()) { if !isConnectedToQuorumSince(s.r.transport, start, s.MemberId(), s.cluster.Members()) {
return ErrTimeoutDueToConnectionLost return etcderrors.ErrTimeoutDueToConnectionLost
} }
default: default:
if !isConnectedSince(s.r.transport, start, lead) { if !isConnectedSince(s.r.transport, start, lead) {
return ErrTimeoutDueToConnectionLost return etcderrors.ErrTimeoutDueToConnectionLost
} }
} }
return ErrTimeout return etcderrors.ErrTimeout
default: default:
return err return err

View File

@ -47,6 +47,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mock/mockstorage" "go.etcd.io/etcd/server/v3/mock/mockstorage"
"go.etcd.io/etcd/server/v3/mock/mockstore" "go.etcd.io/etcd/server/v3/mock/mockstore"
@ -95,7 +96,7 @@ func TestDoLocalAction(t *testing.T) {
}, },
{ {
pb.Request{Method: "BADMETHOD", ID: 1}, pb.Request{Method: "BADMETHOD", ID: 1},
Response{}, ErrUnknownMethod, []testutil.Action{}, Response{}, etcderrors.ErrUnknownMethod, []testutil.Action{},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -461,7 +462,7 @@ func TestApplyRequest(t *testing.T) {
// Unknown method - error // Unknown method - error
{ {
pb.Request{Method: "BADMETHOD", ID: 1}, pb.Request{Method: "BADMETHOD", ID: 1},
Response{Err: ErrUnknownMethod}, Response{Err: etcderrors.ErrUnknownMethod},
[]testutil.Action{}, []testutil.Action{},
}, },
} }
@ -828,8 +829,8 @@ func TestDoProposalCancelled(t *testing.T) {
cancel() cancel()
_, err := srv.Do(ctx, pb.Request{Method: "PUT"}) _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
if err != ErrCanceled { if err != etcderrors.ErrCanceled {
t.Fatalf("err = %v, want %v", err, ErrCanceled) t.Fatalf("err = %v, want %v", err, etcderrors.ErrCanceled)
} }
w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}} w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
if !reflect.DeepEqual(wt.Action(), w) { if !reflect.DeepEqual(wt.Action(), w) {
@ -851,8 +852,8 @@ func TestDoProposalTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 0) ctx, cancel := context.WithTimeout(context.Background(), 0)
_, err := srv.Do(ctx, pb.Request{Method: "PUT"}) _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
cancel() cancel()
if err != ErrTimeout { if err != etcderrors.ErrTimeout {
t.Fatalf("err = %v, want %v", err, ErrTimeout) t.Fatalf("err = %v, want %v", err, etcderrors.ErrTimeout)
} }
} }
@ -870,8 +871,8 @@ func TestDoProposalStopped(t *testing.T) {
srv.stopping = make(chan struct{}) srv.stopping = make(chan struct{})
close(srv.stopping) close(srv.stopping)
_, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1}) _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
if err != ErrStopped { if err != etcderrors.ErrStopped {
t.Errorf("err = %v, want %v", err, ErrStopped) t.Errorf("err = %v, want %v", err, etcderrors.ErrStopped)
} }
} }
@ -1942,14 +1943,14 @@ func TestWaitAppliedIndex(t *testing.T) {
action: func(s *EtcdServer) { action: func(s *EtcdServer) {
s.stopping <- struct{}{} s.stopping <- struct{}{}
}, },
ExpectedError: ErrStopped, ExpectedError: etcderrors.ErrStopped,
}, },
{ {
name: "Timed out waiting for the applied index", name: "Timed out waiting for the applied index",
appliedIndex: 10, appliedIndex: 10,
committedIndex: 12, committedIndex: 12,
action: nil, action: nil,
ExpectedError: ErrTimeoutWaitAppliedIndex, ExpectedError: etcderrors.ErrTimeoutWaitAppliedIndex,
}, },
} }
for _, tc := range cases { for _, tc := range cases {

View File

@ -17,21 +17,17 @@ package txn
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"sort" "sort"
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap" "go.uber.org/zap"
) )
var (
ErrKeyNotFound = errors.New("etcdserver: key not found")
)
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{} resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
@ -68,7 +64,7 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
if p.IgnoreValue || p.IgnoreLease { if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 { if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair // ignore_{lease,value} flag expects previous key-value pair
return nil, nil, ErrKeyNotFound return nil, nil, etcderrors.ErrKeyNotFound
} }
} }
if p.IgnoreValue { if p.IgnoreValue {
@ -381,7 +377,7 @@ func checkRequestPut(rv mvcc.ReadView, lessor lease.Lessor, reqOp *pb.RequestOp)
return err return err
} }
if rr == nil || len(rr.KVs) == 0 { if rr == nil || len(rr.KVs) == 0 {
return ErrKeyNotFound return etcderrors.ErrKeyNotFound
} }
} }
if lease.LeaseID(req.Lease) != lease.NoLease { if lease.LeaseID(req.Lease) != lease.NoLease {

View File

@ -21,6 +21,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
) )
type RequestV2 pb.Request type RequestV2 pb.Request
@ -116,7 +117,7 @@ func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *Requ
return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
case <-a.s.stopping: case <-a.s.stopping:
} }
return Response{}, ErrStopped return Response{}, etcderrors.ErrStopped
} }
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
@ -157,7 +158,7 @@ func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Respons
case "HEAD": case "HEAD":
return v2api.Head(ctx, r) return v2api.Head(ctx, r)
} }
return Response{}, ErrUnknownMethod return Response{}, etcderrors.ErrUnknownMethod
} }
func (r *RequestV2) String() string { func (r *RequestV2) String() string {

View File

@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/server/v3/etcdserver/txn" "go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/lease/leasehttp"
@ -258,9 +259,9 @@ func (s *EtcdServer) waitAppliedIndex() error {
select { select {
case <-s.ApplyWait(): case <-s.ApplyWait():
case <-s.stopping: case <-s.stopping:
return ErrStopped return etcderrors.ErrStopped
case <-time.After(applyTimeout): case <-time.After(applyTimeout):
return ErrTimeoutWaitAppliedIndex return etcderrors.ErrTimeoutWaitAppliedIndex
} }
return nil return nil
@ -310,9 +311,9 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
} }
if cctx.Err() == context.DeadlineExceeded { if cctx.Err() == context.DeadlineExceeded {
return -1, ErrTimeout return -1, etcderrors.ErrTimeout
} }
return -1, ErrCanceled return -1, etcderrors.ErrCanceled
} }
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
@ -360,9 +361,9 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
} }
if cctx.Err() == context.DeadlineExceeded { if cctx.Err() == context.DeadlineExceeded {
return nil, ErrTimeout return nil, etcderrors.ErrTimeout
} }
return nil, ErrCanceled return nil, etcderrors.ErrCanceled
} }
func (s *EtcdServer) newHeader() *pb.ResponseHeader { func (s *EtcdServer) newHeader() *pb.ResponseHeader {
@ -393,13 +394,13 @@ func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error)
case <-time.After(dur): case <-time.After(dur):
leader = s.cluster.Member(s.Leader()) leader = s.cluster.Member(s.Leader())
case <-s.stopping: case <-s.stopping:
return nil, ErrStopped return nil, etcderrors.ErrStopped
case <-ctx.Done(): case <-ctx.Done():
return nil, ErrNoLeader return nil, etcderrors.ErrNoLeader
} }
} }
if len(leader.PeerURLs) == 0 { if len(leader.PeerURLs) == 0 {
return nil, ErrNoLeader return nil, etcderrors.ErrNoLeader
} }
return leader, nil return leader, nil
} }
@ -658,7 +659,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
ai := s.getAppliedIndex() ai := s.getAppliedIndex()
ci := s.getCommittedIndex() ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex { if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, ErrTooManyRequests return nil, etcderrors.ErrTooManyRequests
} }
r.Header = &pb.RequestHeader{ r.Header = &pb.RequestHeader{
@ -683,7 +684,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
} }
if len(data) > int(s.Cfg.MaxRequestBytes) { if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, ErrRequestTooLarge return nil, etcderrors.ErrRequestTooLarge
} }
id := r.ID id := r.ID
@ -713,7 +714,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
s.w.Trigger(id, nil) // GC wait s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start) return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done: case <-s.done:
return nil, ErrStopped return nil, etcderrors.ErrStopped
} }
} }
@ -774,7 +775,7 @@ func (s *EtcdServer) linearizableReadLoop() {
} }
func isStopped(err error) bool { func isStopped(err error) bool {
return err == raft.ErrStopped || err == ErrStopped return err == raft.ErrStopped || err == etcderrors.ErrStopped
} }
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
@ -815,7 +816,7 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
case <-leaderChangedNotifier: case <-leaderChangedNotifier:
readIndexFailed.Inc() readIndexFailed.Inc()
// return a retryable error. // return a retryable error.
return 0, ErrLeaderChanged return 0, etcderrors.ErrLeaderChanged
case <-firstCommitInTermNotifier: case <-firstCommitInTermNotifier:
firstCommitInTermNotifier = s.firstCommitInTerm.Receive() firstCommitInTermNotifier = s.firstCommitInTerm.Receive()
lg.Info("first commit in current term: resending ReadIndex request") lg.Info("first commit in current term: resending ReadIndex request")
@ -843,9 +844,9 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
zap.Duration("timeout", s.Cfg.ReqTimeout()), zap.Duration("timeout", s.Cfg.ReqTimeout()),
) )
slowReadIndex.Inc() slowReadIndex.Inc()
return 0, ErrTimeout return 0, etcderrors.ErrTimeout
case <-s.stopping: case <-s.stopping:
return 0, ErrStopped return 0, etcderrors.ErrStopped
} }
} }
} }
@ -896,7 +897,7 @@ func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.done: case <-s.done:
return ErrStopped return etcderrors.ErrStopped
} }
} }
@ -921,7 +922,7 @@ func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb
case pb.DowngradeRequest_CANCEL: case pb.DowngradeRequest_CANCEL:
return s.downgradeCancel(ctx) return s.downgradeCancel(ctx)
default: default:
return nil, ErrUnknownMethod return nil, etcderrors.ErrUnknownMethod
} }
} }
@ -935,7 +936,7 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg
cv := s.ClusterVersion() cv := s.ClusterVersion()
if cv == nil { if cv == nil {
return nil, ErrClusterVersionUnavailable return nil, etcderrors.ErrClusterVersionUnavailable
} }
resp.Version = version.Cluster(cv.String()) resp.Version = version.Cluster(cv.String())
err = s.Version().DowngradeValidate(ctx, targetVersion) err = s.Version().DowngradeValidate(ctx, targetVersion)

View File

@ -26,7 +26,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.etcd.io/etcd/tests/v3/functional/rpcpb" "go.etcd.io/etcd/tests/v3/functional/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
@ -153,13 +153,13 @@ func (s *keyStresser) isRetryableError(err error) bool {
// as well. We want to keep stressing until the cluster elects a // as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again. // new leader and start processing requests again.
return true return true
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): case etcderrors.ErrTimeoutDueToLeaderFail.Error(), etcderrors.ErrTimeout.Error():
// This retries when request is triggered at the same time as // This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors // leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect // from losing their leader. Followers should retry to connect
// to the new leader. // to the new leader.
return true return true
case etcdserver.ErrStopped.Error(): case etcderrors.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection // one of the etcd nodes stopped from failure injection
return true return true
case rpctypes.ErrNotCapable.Error(): case rpctypes.ErrNotCapable.Error():