fix: enable gofumpt instead of gofmt linter in server
Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
This commit is contained in:
@ -61,7 +61,6 @@ func (t *tokenJWT) info(ctx context.Context, token string, rev uint64) (*AuthInf
|
||||
return t.key, nil
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.lg.Warn(
|
||||
"failed to parse a JWT token",
|
||||
@ -137,7 +136,7 @@ func newTokenProviderJWT(lg *zap.Logger, optMap map[string]string) (*tokenJWT, e
|
||||
return nil, ErrInvalidAuthOpts
|
||||
}
|
||||
|
||||
var keys = make([]string, 0, len(optMap))
|
||||
keys := make([]string, 0, len(optMap))
|
||||
for k := range optMap {
|
||||
if !knownOptions[k] {
|
||||
keys = append(keys, k)
|
||||
|
@ -220,7 +220,7 @@ func TestJWTTokenWithMissingFields(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJWTBad(t *testing.T) {
|
||||
var badCases = map[string]map[string]string{
|
||||
badCases := map[string]map[string]string{
|
||||
"no options": {},
|
||||
"invalid method": {
|
||||
"sign-method": "invalid",
|
||||
|
@ -27,9 +27,11 @@ func (t *tokenNop) genTokenPrefix() (string, error) { return "", nil }
|
||||
func (t *tokenNop) info(ctx context.Context, token string, rev uint64) (*AuthInfo, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (t *tokenNop) assign(ctx context.Context, username string, revision uint64) (string, error) {
|
||||
return "", ErrAuthFailed
|
||||
}
|
||||
|
||||
func newTokenProviderNop() (*tokenNop, error) {
|
||||
return &tokenNop{}, nil
|
||||
}
|
||||
|
@ -40,10 +40,8 @@ var knownOptions = map[string]bool{
|
||||
optTTL: true,
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultTTL will be used when a 'ttl' is not specified
|
||||
DefaultTTL = 5 * time.Minute
|
||||
)
|
||||
var DefaultTTL = 5 * time.Minute
|
||||
|
||||
type jwtOptions struct {
|
||||
SignMethod jwt.SigningMethod
|
||||
|
@ -74,7 +74,8 @@ func checkKeyInterval(
|
||||
lg *zap.Logger,
|
||||
cachedPerms *unifiedRangePermissions,
|
||||
key, rangeEnd []byte,
|
||||
permtyp authpb.Permission_Type) bool {
|
||||
permtyp authpb.Permission_Type,
|
||||
) bool {
|
||||
if isOpenEnded(rangeEnd) {
|
||||
rangeEnd = nil
|
||||
// nil rangeEnd will be converetd to []byte{}, the largest element of BytesAffineComparable,
|
||||
|
@ -47,22 +47,26 @@ func TestRangePermission(t *testing.T) {
|
||||
},
|
||||
{
|
||||
[]adt.Interval{adt.NewBytesAffineInterval([]byte("a"), []byte("d")), adt.NewBytesAffineInterval([]byte("a"), []byte("b")), adt.NewBytesAffineInterval([]byte("c"), []byte("f"))},
|
||||
[]byte("a"), []byte{},
|
||||
[]byte("a"),
|
||||
[]byte{},
|
||||
false,
|
||||
},
|
||||
{
|
||||
[]adt.Interval{adt.NewBytesAffineInterval([]byte("a"), []byte{})},
|
||||
[]byte("a"), []byte{},
|
||||
[]byte("a"),
|
||||
[]byte{},
|
||||
true,
|
||||
},
|
||||
{
|
||||
[]adt.Interval{adt.NewBytesAffineInterval([]byte{0x00}, []byte{})},
|
||||
[]byte("a"), []byte{},
|
||||
[]byte("a"),
|
||||
[]byte{},
|
||||
true,
|
||||
},
|
||||
{
|
||||
[]adt.Interval{adt.NewBytesAffineInterval([]byte{0x00}, []byte{})},
|
||||
[]byte{0x00}, []byte{},
|
||||
[]byte{0x00},
|
||||
[]byte{},
|
||||
true,
|
||||
},
|
||||
}
|
||||
|
@ -1120,7 +1120,8 @@ func NewTokenProvider(
|
||||
lg *zap.Logger,
|
||||
tokenOpts string,
|
||||
indexWaiter func(uint64) <-chan struct{},
|
||||
TokenTTL time.Duration) (TokenProvider, error) {
|
||||
TokenTTL time.Duration,
|
||||
) (TokenProvider, error) {
|
||||
tokenType, typeSpecificOpts, err := decomposeOpts(lg, tokenOpts)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidAuthOpts
|
||||
|
@ -524,7 +524,6 @@ func TestRoleGrantPermission(t *testing.T) {
|
||||
Name: "role-test-1",
|
||||
Perm: perm,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@ -704,7 +703,6 @@ func TestRootRoleGrantPermission(t *testing.T) {
|
||||
Name: "root",
|
||||
Perm: perm,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@ -742,7 +740,6 @@ func TestRoleRevokePermission(t *testing.T) {
|
||||
Name: "role-test-1",
|
||||
Perm: perm,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -97,7 +97,8 @@ func (sctx *serveCtx) serve(
|
||||
errHandler func(error),
|
||||
grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
|
||||
splitHTTP bool,
|
||||
gopts ...grpc.ServerOption) (err error) {
|
||||
gopts ...grpc.ServerOption,
|
||||
) (err error) {
|
||||
logger := defaultLog.New(io.Discard, "etcdhttp", 0)
|
||||
<-s.ReadyNotify()
|
||||
|
||||
|
@ -38,13 +38,11 @@ var (
|
||||
gatewayCA string
|
||||
)
|
||||
|
||||
var (
|
||||
rootCmd = &cobra.Command{
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "etcd",
|
||||
Short: "etcd server",
|
||||
SuggestFor: []string{"etcd"},
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(newGatewayCommand())
|
||||
|
@ -346,7 +346,6 @@ func TestLinearizableReadCheck(t *testing.T) {
|
||||
|
||||
func checkHTTPResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("fail serve http request %s %v", url, err)
|
||||
}
|
||||
|
@ -64,12 +64,15 @@ type fakeServer struct {
|
||||
func (s *fakeServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
|
||||
return nil, fmt.Errorf("AddMember not implemented in fakeServer")
|
||||
}
|
||||
|
||||
func (s *fakeServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
||||
return nil, fmt.Errorf("RemoveMember not implemented in fakeServer")
|
||||
}
|
||||
|
||||
func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) {
|
||||
return nil, fmt.Errorf("UpdateMember not implemented in fakeServer")
|
||||
}
|
||||
|
||||
func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
||||
return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
|
||||
}
|
||||
|
@ -44,8 +44,10 @@ func TestAddRemoveMember(t *testing.T) {
|
||||
c2 := newTestCluster(t, nil)
|
||||
c2.SetBackend(be)
|
||||
c2.Recover(func(*zap.Logger, *semver.Version) {})
|
||||
assert.Equal(t, []*Member{{ID: types.ID(19),
|
||||
Attributes: Attributes{Name: "node19"}}}, c2.Members())
|
||||
assert.Equal(t, []*Member{{
|
||||
ID: types.ID(19),
|
||||
Attributes: Attributes{Name: "node19"},
|
||||
}}, c2.Members())
|
||||
assert.True(t, c2.IsIDRemoved(17))
|
||||
assert.True(t, c2.IsIDRemoved(18))
|
||||
assert.False(t, c2.IsIDRemoved(19))
|
||||
@ -78,14 +80,17 @@ func (b *backendMock) MustSaveClusterVersionToBackend(version *semver.Version) {
|
||||
func (b *backendMock) MustReadMembersFromBackend() (x map[types.ID]*Member, y map[types.ID]bool) {
|
||||
return b.members, b.removed
|
||||
}
|
||||
|
||||
func (b *backendMock) MustSaveMemberToBackend(m *Member) {
|
||||
b.members[m.ID] = m
|
||||
}
|
||||
|
||||
func (b *backendMock) TrimMembershipFromBackend() error {
|
||||
b.members = make(map[types.ID]*Member)
|
||||
b.removed = make(map[types.ID]bool)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *backendMock) MustDeleteMemberFromBackend(id types.ID) {
|
||||
delete(b.members, id)
|
||||
b.removed[id] = true
|
||||
|
@ -272,6 +272,7 @@ type respRoundTripper struct {
|
||||
func newRespRoundTripper(code int, err error) *respRoundTripper {
|
||||
return &respRoundTripper{code: code, err: err}
|
||||
}
|
||||
|
||||
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
@ -32,10 +32,8 @@ import (
|
||||
"go.etcd.io/raft/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
// timeout for reading snapshot response body
|
||||
snapResponseReadTimeout = 5 * time.Second
|
||||
)
|
||||
var snapResponseReadTimeout = 5 * time.Second
|
||||
|
||||
type snapshotSender struct {
|
||||
from, to types.ID
|
||||
|
@ -90,12 +90,10 @@ func (t streamType) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// linkHeartbeatMessage is a special message used as heartbeat message in
|
||||
// link layer. It never conflicts with messages from raft because raft
|
||||
// doesn't send out messages without From and To fields.
|
||||
linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
|
||||
)
|
||||
var linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
|
||||
|
||||
func isLinkHeartbeatMessage(m *raftpb.Message) bool {
|
||||
return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
|
||||
|
@ -234,6 +234,7 @@ func (wrc *waitReadCloser) Read(p []byte) (int, error) {
|
||||
<-wrc.closec
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
func (wrc *waitReadCloser) Close() error {
|
||||
close(wrc.closec)
|
||||
return nil
|
||||
|
@ -169,7 +169,8 @@ func minClusterVersion(h http.Header) *semver.Version {
|
||||
func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
|
||||
localServer *semver.Version,
|
||||
localMinCluster *semver.Version,
|
||||
err error) {
|
||||
err error,
|
||||
) {
|
||||
localServer = semver.Must(semver.NewVersion(version.Version))
|
||||
localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
|
||||
if compareMajorMinorVersion(server, localMinCluster) == -1 {
|
||||
|
@ -88,7 +88,7 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
spath := filepath.Join(s.dir, fname)
|
||||
|
||||
fsyncStart := time.Now()
|
||||
err = pioutil.WriteAndSyncFile(spath, d, 0666)
|
||||
err = pioutil.WriteAndSyncFile(spath, d, 0o666)
|
||||
snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())
|
||||
|
||||
if err != nil {
|
||||
|
@ -43,7 +43,7 @@ var testSnap = &raftpb.Snapshot{
|
||||
|
||||
func TestSaveAndLoad(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -65,7 +65,7 @@ func TestSaveAndLoad(t *testing.T) {
|
||||
|
||||
func TestBadCRC(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -88,14 +88,14 @@ func TestBadCRC(t *testing.T) {
|
||||
|
||||
func TestFailback(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
large := fmt.Sprintf("%016x-%016x-%016x.snap", 0xFFFF, 0xFFFF, 0xFFFF)
|
||||
err = os.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0666)
|
||||
err = os.WriteFile(filepath.Join(dir, large), []byte("bad data"), 0o666)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -122,7 +122,7 @@ func TestFailback(t *testing.T) {
|
||||
|
||||
func TestSnapNames(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -151,7 +151,7 @@ func TestSnapNames(t *testing.T) {
|
||||
|
||||
func TestLoadNewestSnap(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -215,7 +215,7 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
|
||||
func TestNoSnapshot(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -229,7 +229,7 @@ func TestNoSnapshot(t *testing.T) {
|
||||
|
||||
func TestEmptySnapshot(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -250,7 +250,7 @@ func TestEmptySnapshot(t *testing.T) {
|
||||
// ErrNoSnapshot if all the snapshots are broken.
|
||||
func TestAllSnapshotBroken(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -270,7 +270,7 @@ func TestAllSnapshotBroken(t *testing.T) {
|
||||
|
||||
func TestReleaseSnapDBs(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
err := os.Mkdir(dir, 0o700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -279,7 +279,7 @@ func TestReleaseSnapDBs(t *testing.T) {
|
||||
snapIndices := []uint64{100, 200, 300, 400}
|
||||
for _, index := range snapIndices {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||
if err := os.WriteFile(filename, []byte("snap file\n"), 0644); err != nil {
|
||||
if err := os.WriteFile(filename, []byte("snap file\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -257,7 +257,8 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64)
|
||||
}
|
||||
|
||||
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
|
||||
value string, expireOpts TTLOptionSet) (*Event, error) {
|
||||
value string, expireOpts TTLOptionSet,
|
||||
) (*Event, error) {
|
||||
var err *v2error.Error
|
||||
|
||||
s.worldLock.Lock()
|
||||
@ -563,7 +564,8 @@ func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet
|
||||
}
|
||||
|
||||
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
|
||||
expireTime time.Time, action string) (*Event, *v2error.Error) {
|
||||
expireTime time.Time, action string,
|
||||
) (*Event, *v2error.Error) {
|
||||
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
||||
|
||||
if unique { // append unique item under the node path
|
||||
@ -587,7 +589,6 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
||||
|
||||
// walk through the nodePath, create dirs and get the last directory node
|
||||
d, err := s.walk(dirName, s.checkDir)
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
reportWriteFailure(action)
|
||||
@ -664,7 +665,6 @@ func (s *store) internalGet(nodePath string) (*node, *v2error.Error) {
|
||||
}
|
||||
|
||||
f, err := s.walk(nodePath, walkFunc)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -771,7 +771,6 @@ func (s *store) Recovery(state []byte) error {
|
||||
s.worldLock.Lock()
|
||||
defer s.worldLock.Unlock()
|
||||
err := json.Unmarshal(state, s)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -59,7 +59,6 @@ func newWatchHub(capacity int) *watcherHub {
|
||||
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
|
||||
reportWatchRequest()
|
||||
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||
|
||||
if err != nil {
|
||||
err.Index = storeIndex
|
||||
return nil, err
|
||||
|
@ -436,6 +436,7 @@ func (cls *clusterInfo) Len() int { return len(cls.members) }
|
||||
func (cls *clusterInfo) Less(i, j int) bool {
|
||||
return cls.members[i].createRev < cls.members[j].createRev
|
||||
}
|
||||
|
||||
func (cls *clusterInfo) Swap(i, j int) {
|
||||
cls.members[i], cls.members[j] = cls.members[j], cls.members[i]
|
||||
}
|
||||
|
@ -181,7 +181,8 @@ func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.
|
||||
}
|
||||
|
||||
func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
|
||||
) {
|
||||
lg.Debug("request stats",
|
||||
zap.Time("start time", startTime),
|
||||
zap.Duration("time spent", duration),
|
||||
@ -196,7 +197,8 @@ func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.D
|
||||
}
|
||||
|
||||
func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string,
|
||||
) {
|
||||
lg.Warn("request stats",
|
||||
zap.Time("start time", startTime),
|
||||
zap.Duration("time spent", duration),
|
||||
|
@ -43,7 +43,6 @@ func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
|
||||
|
||||
func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||
resp, err := ls.le.LeaseGrant(ctx, cr)
|
||||
|
||||
if err != nil {
|
||||
return nil, togRPCError(err)
|
||||
}
|
||||
|
@ -545,7 +545,8 @@ func IsCreateEvent(e mvccpb.Event) bool {
|
||||
func sendFragments(
|
||||
wr *pb.WatchResponse,
|
||||
maxRequestBytes int,
|
||||
sendFunc func(*pb.WatchResponse) error) error {
|
||||
sendFunc func(*pb.WatchResponse) error,
|
||||
) error {
|
||||
// no need to fragment if total request size is smaller
|
||||
// than max request limit or response contains only one event
|
||||
if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
|
||||
|
@ -132,7 +132,8 @@ func newApplierV3Backend(
|
||||
raftStatus RaftStatusGetter,
|
||||
snapshotServer SnapshotServer,
|
||||
consistentIndex cindex.ConsistentIndexer,
|
||||
txnModeWriteWithSharedBuffer bool) applierV3 {
|
||||
txnModeWriteWithSharedBuffer bool,
|
||||
) applierV3 {
|
||||
return &applierV3backend{
|
||||
lg: lg,
|
||||
kv: kv,
|
||||
@ -143,7 +144,8 @@ func newApplierV3Backend(
|
||||
raftStatus: raftStatus,
|
||||
snapshotServer: snapshotServer,
|
||||
consistentIndex: consistentIndex,
|
||||
txnModeWriteWithSharedBuffer: txnModeWriteWithSharedBuffer}
|
||||
txnModeWriteWithSharedBuffer: txnModeWriteWithSharedBuffer,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result {
|
||||
|
@ -53,15 +53,19 @@ type fakeRaftStatusGetter struct{}
|
||||
func (*fakeRaftStatusGetter) MemberID() types.ID {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*fakeRaftStatusGetter) Leader() types.ID {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*fakeRaftStatusGetter) CommittedIndex() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*fakeRaftStatusGetter) AppliedIndex() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*fakeRaftStatusGetter) Term() uint64 {
|
||||
return 0
|
||||
}
|
||||
@ -222,8 +226,7 @@ func TestAuthApplierV3_Apply(t *testing.T) {
|
||||
// TestAuthApplierV3_AdminPermission ensures the admin permission is checked for certain
|
||||
// operations
|
||||
func TestAuthApplierV3_AdminPermission(t *testing.T) {
|
||||
tcs :=
|
||||
[]struct {
|
||||
tcs := []struct {
|
||||
name string
|
||||
request *pb.InternalRaftRequest
|
||||
adminPermissionNeeded bool
|
||||
|
@ -16,15 +16,13 @@ package apply
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
var (
|
||||
alarms = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
var alarms = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd_debugging",
|
||||
Subsystem: "server",
|
||||
Name: "alarms",
|
||||
Help: "Alarms for every member in cluster. 1 for 'server_id' label with current ID. 2 for 'alarm_type' label with type of this alarm",
|
||||
},
|
||||
[]string{"server_id", "alarm_type"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(alarms)
|
||||
|
@ -61,7 +61,8 @@ func NewUberApplier(
|
||||
consistentIndex cindex.ConsistentIndexer,
|
||||
warningApplyDuration time.Duration,
|
||||
txnModeWriteWithSharedBuffer bool,
|
||||
quotaBackendBytesCfg int64) UberApplier {
|
||||
quotaBackendBytesCfg int64,
|
||||
) UberApplier {
|
||||
applyV3base := newApplierV3(lg, be, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer, quotaBackendBytesCfg)
|
||||
|
||||
ua := &uberApplier{
|
||||
@ -87,7 +88,8 @@ func newApplierV3(
|
||||
snapshotServer SnapshotServer,
|
||||
consistentIndex cindex.ConsistentIndexer,
|
||||
txnModeWriteWithSharedBuffer bool,
|
||||
quotaBackendBytesCfg int64) applierV3 {
|
||||
quotaBackendBytesCfg int64,
|
||||
) applierV3 {
|
||||
applierBackend := newApplierV3Backend(lg, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer)
|
||||
return newAuthApplierV3(
|
||||
authStore,
|
||||
|
@ -165,7 +165,8 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}}},
|
||||
},
|
||||
}},
|
||||
expectError: errors.ErrNoSpace,
|
||||
},
|
||||
{
|
||||
@ -179,7 +180,8 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}}},
|
||||
},
|
||||
}},
|
||||
expectError: nil,
|
||||
},
|
||||
{
|
||||
@ -210,7 +212,8 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}}},
|
||||
},
|
||||
}},
|
||||
expectError: nil,
|
||||
},
|
||||
{
|
||||
|
@ -225,12 +225,12 @@ func createDataDir(t *testing.T) (string, error) {
|
||||
dataDir := t.TempDir()
|
||||
|
||||
// create ${dataDir}/member/snap
|
||||
if err = os.MkdirAll(datadir.ToSnapDir(dataDir), 0700); err != nil {
|
||||
if err = os.MkdirAll(datadir.ToSnapDir(dataDir), 0o700); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// create ${dataDir}/member/wal
|
||||
err = os.MkdirAll(datadir.ToWALDir(dataDir), 0700)
|
||||
err = os.MkdirAll(datadir.ToWALDir(dataDir), 0o700)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -28,7 +28,6 @@ type Backend interface {
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
type ConsistentIndexer interface {
|
||||
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
@ -150,9 +149,11 @@ type fakeConsistentIndex struct {
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
@ -161,6 +162,7 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
|
@ -507,9 +507,9 @@ func (f *fakeHasher) TriggerCorruptAlarm(memberID types.ID) {
|
||||
}
|
||||
|
||||
func TestHashKVHandler(t *testing.T) {
|
||||
var remoteClusterID = 111195
|
||||
var localClusterID = 111196
|
||||
var revision = 1
|
||||
remoteClusterID := 111195
|
||||
localClusterID := 111196
|
||||
revision := 1
|
||||
|
||||
etcdSrv := &EtcdServer{}
|
||||
etcdSrv.cluster = newTestCluster(t)
|
||||
|
@ -50,18 +50,36 @@ func TestGetIDs(t *testing.T) {
|
||||
widSet []uint64
|
||||
}{
|
||||
{nil, []raftpb.Entry{}, []uint64{}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{}, []uint64{1}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry}, []uint64{1, 2}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, removeEntry}, []uint64{1}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}},
|
||||
{&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{},
|
||||
[]uint64{1},
|
||||
},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry},
|
||||
[]uint64{1, 2},
|
||||
},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, removeEntry},
|
||||
[]uint64{1},
|
||||
},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, normalEntry},
|
||||
[]uint64{1, 2},
|
||||
},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, normalEntry, updateEntry},
|
||||
[]uint64{1, 2},
|
||||
},
|
||||
{
|
||||
&raftpb.ConfState{Voters: []uint64{1}},
|
||||
[]raftpb.Entry{addEntry, removeEntry, normalEntry},
|
||||
[]uint64{1},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
|
@ -583,7 +583,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
Data: pbutil.MustMarshal(
|
||||
&raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeRemoveNode,
|
||||
NodeID: uint64(i)}),
|
||||
NodeID: uint64(i),
|
||||
}),
|
||||
}
|
||||
ents = append(ents, ent)
|
||||
}
|
||||
@ -674,7 +675,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
||||
testdir := t.TempDir()
|
||||
|
||||
snapdir := filepath.Join(testdir, "member", "snap")
|
||||
if err := os.MkdirAll(snapdir, 0755); err != nil {
|
||||
if err := os.MkdirAll(snapdir, 0o755); err != nil {
|
||||
t.Fatalf("couldn't make snap dir (%v)", err)
|
||||
}
|
||||
|
||||
@ -765,7 +766,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
cl.SetBackend(schema.NewMembershipBackend(lg, be))
|
||||
|
||||
testdir := t.TempDir()
|
||||
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
|
||||
if err := os.MkdirAll(testdir+"/member/snap", 0o755); err != nil {
|
||||
t.Fatalf("Couldn't make snap dir (%v)", err)
|
||||
}
|
||||
|
||||
@ -1105,7 +1106,8 @@ func TestPublishV3(t *testing.T) {
|
||||
t.Fatalf("unmarshal request error: %v", err)
|
||||
}
|
||||
assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{
|
||||
Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet)
|
||||
Name: "node1", ClientUrls: []string{"http://a", "http://b"},
|
||||
}}, r.ClusterMemberAttrSet)
|
||||
}
|
||||
|
||||
// TestPublishV3Stopped tests that publish will be stopped if server is stopped.
|
||||
@ -1307,14 +1309,17 @@ func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
||||
n.Record(testutil.Action{Name: "Campaign"})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
|
||||
n.Record(testutil.Action{Name: "Propose", Params: []any{data}})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
|
||||
n.Record(testutil.Action{Name: "ProposeConfChange"})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
|
||||
n.Record(testutil.Action{Name: "Step"})
|
||||
return nil
|
||||
@ -1354,8 +1359,10 @@ type readyNode struct {
|
||||
func newReadyNode() *readyNode {
|
||||
return &readyNode{
|
||||
nodeRecorder{testutil.NewRecorderStream()},
|
||||
make(chan raft.Ready, 1)}
|
||||
make(chan raft.Ready, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func newNopReadyNode() *readyNode {
|
||||
return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
|
||||
}
|
||||
@ -1401,9 +1408,11 @@ func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context,
|
||||
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
|
||||
return n.readyc
|
||||
}
|
||||
|
||||
func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
|
||||
n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
|
||||
return &raftpb.ConfState{}
|
||||
|
@ -40,6 +40,7 @@ func NewRecorder() *StoreRecorder {
|
||||
sr := &storeRecorder{Recorder: &testutil.RecorderBuffered{}}
|
||||
return &StoreRecorder{Store: sr, Recorder: sr.Recorder}
|
||||
}
|
||||
|
||||
func NewRecorderStream() *StoreRecorder {
|
||||
sr := &storeRecorder{Recorder: testutil.NewRecorderStream()}
|
||||
return &StoreRecorder{Store: sr, Recorder: sr.Recorder}
|
||||
@ -54,6 +55,7 @@ func (s *storeRecorder) Get(path string, recursive, sorted bool) (*v2store.Event
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Set(path string, dir bool, val string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "Set",
|
||||
@ -61,6 +63,7 @@ func (s *storeRecorder) Set(path string, dir bool, val string, expireOpts v2stor
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Update(path, val string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "Update",
|
||||
@ -68,6 +71,7 @@ func (s *storeRecorder) Update(path, val string, expireOpts v2store.TTLOptionSet
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "Create",
|
||||
@ -75,6 +79,7 @@ func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "CompareAndSwap",
|
||||
@ -82,6 +87,7 @@ func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Delete(path string, dir, recursive bool) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "Delete",
|
||||
@ -89,6 +95,7 @@ func (s *storeRecorder) Delete(path string, dir, recursive bool) (*v2store.Event
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*v2store.Event, error) {
|
||||
s.Record(testutil.Action{
|
||||
Name: "CompareAndDelete",
|
||||
@ -96,14 +103,17 @@ func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (
|
||||
})
|
||||
return &v2store.Event{}, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (v2store.Watcher, error) {
|
||||
s.Record(testutil.Action{Name: "Watch"})
|
||||
return v2store.NewNopWatcher(), nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Save() ([]byte, error) {
|
||||
s.Record(testutil.Action{Name: "Save"})
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Recovery(b []byte) error {
|
||||
s.Record(testutil.Action{Name: "Recovery"})
|
||||
return nil
|
||||
@ -155,6 +165,7 @@ func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*v2store.Ev
|
||||
s.storeRecorder.Get(path, recursive, sorted)
|
||||
return nil, s.err
|
||||
}
|
||||
|
||||
func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (v2store.Watcher, error) {
|
||||
s.storeRecorder.Watch(path, recursive, sorted, index)
|
||||
return nil, s.err
|
||||
|
@ -38,6 +38,7 @@ func (w *waitRecorder) Register(id uint64) <-chan any {
|
||||
w.Record(testutil.Action{Name: "Register"})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *waitRecorder) Trigger(id uint64, x any) {
|
||||
w.Record(testutil.Action{Name: "Trigger"})
|
||||
}
|
||||
|
@ -60,6 +60,7 @@ type es2ecServerStream struct{ chanServerStream }
|
||||
func (s *es2ecClientStream) Send(rr *v3electionpb.LeaderRequest) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *es2ecClientStream) Recv() (*v3electionpb.LeaderResponse, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
@ -71,6 +72,7 @@ func (s *es2ecClientStream) Recv() (*v3electionpb.LeaderResponse, error) {
|
||||
func (s *es2ecServerStream) Send(rr *v3electionpb.LeaderResponse) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *es2ecServerStream) Recv() (*v3electionpb.LeaderRequest, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
|
@ -62,6 +62,7 @@ type ls2lcServerStream struct{ chanServerStream }
|
||||
func (s *ls2lcClientStream) Send(rr *pb.LeaseKeepAliveRequest) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *ls2lcClientStream) Recv() (*pb.LeaseKeepAliveResponse, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
@ -73,6 +74,7 @@ func (s *ls2lcClientStream) Recv() (*pb.LeaseKeepAliveResponse, error) {
|
||||
func (s *ls2lcServerStream) Send(rr *pb.LeaseKeepAliveResponse) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *ls2lcServerStream) Recv() (*pb.LeaseKeepAliveRequest, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
|
@ -72,6 +72,7 @@ type ss2scServerStream struct{ chanServerStream }
|
||||
func (s *ss2scClientStream) Send(rr *pb.SnapshotRequest) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *ss2scClientStream) Recv() (*pb.SnapshotResponse, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
@ -83,6 +84,7 @@ func (s *ss2scClientStream) Recv() (*pb.SnapshotResponse, error) {
|
||||
func (s *ss2scServerStream) Send(rr *pb.SnapshotResponse) error {
|
||||
return s.SendMsg(rr)
|
||||
}
|
||||
|
||||
func (s *ss2scServerStream) Recv() (*pb.SnapshotRequest, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
|
@ -47,6 +47,7 @@ type ws2wcServerStream struct{ chanServerStream }
|
||||
func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error {
|
||||
return s.SendMsg(wr)
|
||||
}
|
||||
|
||||
func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
@ -58,6 +59,7 @@ func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) {
|
||||
func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error {
|
||||
return s.SendMsg(wr)
|
||||
}
|
||||
|
||||
func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
|
||||
var v any
|
||||
if err := s.RecvMsg(&v); err != nil {
|
||||
|
@ -126,6 +126,7 @@ func (wb *watchBroadcast) add(w *watcher) bool {
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (wb *watchBroadcast) delete(w *watcher) {
|
||||
wb.mu.Lock()
|
||||
defer wb.mu.Unlock()
|
||||
|
@ -194,7 +194,7 @@ func newBackend(bcfg BackendConfig) *backend {
|
||||
bopts.Mlock = bcfg.Mlock
|
||||
bopts.Logger = newBoltLoggerZap(bcfg)
|
||||
|
||||
db, err := bolt.Open(bcfg.Path, 0600, bopts)
|
||||
db, err := bolt.Open(bcfg.Path, 0o600, bopts)
|
||||
if err != nil {
|
||||
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
|
||||
}
|
||||
@ -407,7 +407,6 @@ func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, e
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -497,7 +496,7 @@ func (b *backend) defrag() error {
|
||||
// Don't load tmp db into memory regardless of opening options
|
||||
options.Mlock = false
|
||||
tdbp := temp.Name()
|
||||
tmpdb, err := bolt.Open(tdbp, 0600, &options)
|
||||
tmpdb, err := bolt.Open(tdbp, 0o600, &options)
|
||||
if err != nil {
|
||||
temp.Close()
|
||||
if rmErr := os.Remove(temp.Name()); rmErr != nil && b.lg != nil {
|
||||
@ -567,7 +566,7 @@ func (b *backend) defrag() error {
|
||||
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
|
||||
}
|
||||
|
||||
b.db, err = bolt.Open(dbp, 0600, b.bopts)
|
||||
b.db, err = bolt.Open(dbp, 0o600, b.bopts)
|
||||
if err != nil {
|
||||
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
|
||||
}
|
||||
|
@ -214,7 +214,7 @@ func TestHasherStoreFull(t *testing.T) {
|
||||
defer store.Close()
|
||||
|
||||
var minRevision int64 = 100
|
||||
var maxRevision = minRevision + hashStorageMaxSize
|
||||
maxRevision := minRevision + hashStorageMaxSize
|
||||
for i := 0; i < hashStorageMaxSize; i++ {
|
||||
s.Store(KeyValueHash{Revision: int64(i) + minRevision})
|
||||
}
|
||||
|
@ -22,9 +22,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrRevisionNotFound = errors.New("mvcc: revision not found")
|
||||
)
|
||||
var ErrRevisionNotFound = errors.New("mvcc: revision not found")
|
||||
|
||||
// keyIndex stores the revisions of a key in the backend.
|
||||
// Each keyIndex has at least one key generation.
|
||||
|
@ -766,7 +766,8 @@ func TestWatchableKVWatch(t *testing.T) {
|
||||
wid, _ := w.Watch(0, []byte("foo"), []byte("fop"), 0)
|
||||
|
||||
wev := []mvccpb.Event{
|
||||
{Type: mvccpb.PUT,
|
||||
{
|
||||
Type: mvccpb.PUT,
|
||||
Kv: &mvccpb.KeyValue{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
|
@ -38,9 +38,11 @@ var (
|
||||
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
|
||||
)
|
||||
|
||||
var restoreChunkKeys = 10000 // non-const for testing
|
||||
var defaultCompactionBatchLimit = 1000
|
||||
var defaultCompactionSleepInterval = 10 * time.Millisecond
|
||||
var (
|
||||
restoreChunkKeys = 10000 // non-const for testing
|
||||
defaultCompactionBatchLimit = 1000
|
||||
defaultCompactionSleepInterval = 10 * time.Millisecond
|
||||
)
|
||||
|
||||
type StoreConfig struct {
|
||||
CompactionBatchLimit int
|
||||
|
@ -914,7 +914,8 @@ func newTestBucketKeyBytes(rev BucketKey) []byte {
|
||||
func newFakeStore(lg *zap.Logger) *store {
|
||||
b := &fakeBackend{&fakeBatchTx{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
rangeRespc: make(chan rangeResp, 5)}}
|
||||
rangeRespc: make(chan rangeResp, 5),
|
||||
}}
|
||||
s := &store{
|
||||
cfg: StoreConfig{
|
||||
CompactionBatchLimit: 10000,
|
||||
@ -965,17 +966,21 @@ func (b *fakeBatchTx) UnsafeDeleteBucket(bucket backend.Bucket) {}
|
||||
func (b *fakeBatchTx) UnsafePut(bucket backend.Bucket, key []byte, value []byte) {
|
||||
b.Recorder.Record(testutil.Action{Name: "put", Params: []any{bucket, key, value}})
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) UnsafeSeqPut(bucket backend.Bucket, key []byte, value []byte) {
|
||||
b.Recorder.Record(testutil.Action{Name: "seqput", Params: []any{bucket, key, value}})
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) UnsafeRange(bucket backend.Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
|
||||
b.Recorder.Record(testutil.Action{Name: "range", Params: []any{bucket, key, endKey, limit}})
|
||||
r := <-b.rangeRespc
|
||||
return r.keys, r.vals
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
|
||||
b.Recorder.Record(testutil.Action{Name: "delete", Params: []any{bucket, key}})
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
@ -1041,27 +1046,33 @@ func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created Revision, ver int
|
||||
r := <-i.indexGetRespc
|
||||
return r.rev, r.created, r.ver, r.err
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []Revision) {
|
||||
i.Recorder.Record(testutil.Action{Name: "range", Params: []any{key, end, atRev}})
|
||||
r := <-i.indexRangeRespc
|
||||
return r.keys, r.revs
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Put(key []byte, rev Revision) {
|
||||
i.Recorder.Record(testutil.Action{Name: "put", Params: []any{key, rev}})
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Tombstone(key []byte, rev Revision) error {
|
||||
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []any{key, rev}})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []Revision {
|
||||
i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []any{key, end, rev}})
|
||||
r := <-i.indexRangeEventsRespc
|
||||
return r.revs
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Compact(rev int64) map[Revision]struct{} {
|
||||
i.Recorder.Record(testutil.Action{Name: "compact", Params: []any{rev}})
|
||||
return <-i.indexCompactRespc
|
||||
}
|
||||
|
||||
func (i *fakeIndex) Keep(rev int64) map[Revision]struct{} {
|
||||
i.Recorder.Record(testutil.Action{Name: "keep", Params: []any{rev}})
|
||||
return <-i.indexCompactRespc
|
||||
|
@ -22,12 +22,10 @@ import (
|
||||
"go.etcd.io/etcd/pkg/v3/adt"
|
||||
)
|
||||
|
||||
var (
|
||||
// watchBatchMaxRevs is the maximum distinct revisions that
|
||||
// may be sent to an unsynced watcher at a time. Declared as
|
||||
// var instead of const for testing purposes.
|
||||
watchBatchMaxRevs = 1000
|
||||
)
|
||||
var watchBatchMaxRevs = 1000
|
||||
|
||||
type eventBatch struct {
|
||||
// evs is a batch of revision-ordered events
|
||||
|
@ -69,10 +69,9 @@ type ActionList []action
|
||||
// unsafeExecute executes actions one by one. If one of actions returns error,
|
||||
// it will revert them.
|
||||
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error {
|
||||
var revertActions = make(ActionList, 0, len(as))
|
||||
revertActions := make(ActionList, 0, len(as))
|
||||
for _, a := range as {
|
||||
revert, err := a.unsafeDo(tx)
|
||||
|
||||
if err != nil {
|
||||
revertActions.unsafeExecuteInReversedOrder(lg, tx)
|
||||
return err
|
||||
|
@ -78,8 +78,10 @@ type authBatchTx struct {
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
var _ auth.AuthReadTx = (*authReadTx)(nil)
|
||||
var _ auth.AuthBatchTx = (*authBatchTx)(nil)
|
||||
var (
|
||||
_ auth.AuthReadTx = (*authReadTx)(nil)
|
||||
_ auth.AuthBatchTx = (*authBatchTx)(nil)
|
||||
)
|
||||
|
||||
func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) {
|
||||
if enabled {
|
||||
|
@ -108,7 +108,7 @@ func UnsafeDetectSchemaVersion(lg *zap.Logger, tx backend.UnsafeReader) (v semve
|
||||
|
||||
func schemaChangesForVersion(v semver.Version, isUpgrade bool) ([]schemaChange, error) {
|
||||
// changes should be taken from higher version
|
||||
var higherV = v
|
||||
higherV := v
|
||||
if isUpgrade {
|
||||
higherV = semver.Version{Major: v.Major, Minor: v.Minor + 1}
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func TestVersionSnapshot(t *testing.T) {
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
db, err := bbolt.Open(tmpPath, 0400, &bbolt.Options{ReadOnly: true})
|
||||
db, err := bbolt.Open(tmpPath, 0o400, &bbolt.Options{ReadOnly: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -39,12 +39,10 @@ import (
|
||||
"go.etcd.io/raft/v3/raftpb"
|
||||
)
|
||||
|
||||
var (
|
||||
confState = raftpb.ConfState{
|
||||
var confState = raftpb.ConfState{
|
||||
Voters: []uint64{0x00ffca74},
|
||||
AutoLeave: false,
|
||||
}
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
p := t.TempDir()
|
||||
|
@ -19,9 +19,7 @@ import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCRCMismatch = errors.New("walpb: crc mismatch")
|
||||
)
|
||||
var ErrCRCMismatch = errors.New("walpb: crc mismatch")
|
||||
|
||||
func (rec *Record) Validate(crc uint32) error {
|
||||
if rec.Crc == crc {
|
||||
|
Reference in New Issue
Block a user