backend: Hooks interface & implementation.
This commit is contained in:
@ -104,6 +104,8 @@ type backend struct {
|
|||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
|
|
||||||
|
hooks Hooks
|
||||||
|
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,6 +126,9 @@ type BackendConfig struct {
|
|||||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||||
// Mlock prevents backend database file to be swapped
|
// Mlock prevents backend database file to be swapped
|
||||||
Mlock bool
|
Mlock bool
|
||||||
|
|
||||||
|
// Hooks are getting executed during lifecycle of Backend's transactions.
|
||||||
|
Hooks Hooks
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultBackendConfig() BackendConfig {
|
func DefaultBackendConfig() BackendConfig {
|
||||||
@ -189,6 +194,9 @@ func newBackend(bcfg BackendConfig) *backend {
|
|||||||
lg: bcfg.Logger,
|
lg: bcfg.Logger,
|
||||||
}
|
}
|
||||||
b.batchTx = newBatchTxBuffered(b)
|
b.batchTx = newBatchTxBuffered(b)
|
||||||
|
// We set it after newBatchTxBuffered to skip the 'empty' commit.
|
||||||
|
b.hooks = bcfg.Hooks
|
||||||
|
|
||||||
go b.run()
|
go b.run()
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
@ -109,6 +109,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
|||||||
t.backend.lg.Fatal(
|
t.backend.lg.Fatal(
|
||||||
"failed to find a bucket",
|
"failed to find a bucket",
|
||||||
zap.String("bucket-name", string(bucketName)),
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
zap.Stack("stack"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if seq {
|
if seq {
|
||||||
@ -133,6 +134,7 @@ func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
|
|||||||
t.backend.lg.Fatal(
|
t.backend.lg.Fatal(
|
||||||
"failed to find a bucket",
|
"failed to find a bucket",
|
||||||
zap.String("bucket-name", string(bucketName)),
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
zap.Stack("stack"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
return unsafeRange(bucket.Cursor(), key, endKey, limit)
|
||||||
@ -167,6 +169,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
|
|||||||
t.backend.lg.Fatal(
|
t.backend.lg.Fatal(
|
||||||
"failed to find a bucket",
|
"failed to find a bucket",
|
||||||
zap.String("bucket-name", string(bucketName)),
|
zap.String("bucket-name", string(bucketName)),
|
||||||
|
zap.Stack("stack"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
err := bucket.Delete(key)
|
err := bucket.Delete(key)
|
||||||
@ -283,6 +286,10 @@ func (t *batchTxBuffered) CommitAndStop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) commit(stop bool) {
|
func (t *batchTxBuffered) commit(stop bool) {
|
||||||
|
if t.backend.hooks != nil {
|
||||||
|
t.backend.hooks.OnPreCommitUnsafe(t)
|
||||||
|
}
|
||||||
|
|
||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.Lock()
|
t.backend.readTx.Lock()
|
||||||
t.unsafeCommit(stop)
|
t.unsafeCommit(stop)
|
||||||
|
36
server/mvcc/backend/hooks.go
Normal file
36
server/mvcc/backend/hooks.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package backend
|
||||||
|
|
||||||
|
type HookFunc func(tx BatchTx)
|
||||||
|
|
||||||
|
// Hooks allow to add additional logic executed during transaction lifetime.
|
||||||
|
type Hooks interface {
|
||||||
|
// OnPreCommitUnsafe is executed before Commit of transactions.
|
||||||
|
// The given transaction is already locked.
|
||||||
|
OnPreCommitUnsafe(tx BatchTx)
|
||||||
|
}
|
||||||
|
|
||||||
|
type hooks struct {
|
||||||
|
onPreCommitUnsafe HookFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
|
||||||
|
h.onPreCommitUnsafe(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHooks(onPreCommitUnsafe HookFunc) Hooks {
|
||||||
|
return hooks{onPreCommitUnsafe: onPreCommitUnsafe}
|
||||||
|
}
|
Reference in New Issue
Block a user