server: add experimental flag for using shared buffer in transacton write
This commit is contained in:
@ -177,6 +177,10 @@ type ServerConfig struct {
|
|||||||
// Currently all etcd memory gets mlocked, but in future the flag can
|
// Currently all etcd memory gets mlocked, but in future the flag can
|
||||||
// be refined to mlock in-use area of bbolt only.
|
// be refined to mlock in-use area of bbolt only.
|
||||||
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`
|
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`
|
||||||
|
|
||||||
|
// ExperimentalTxnModeWriteWithSharedBuffer enable write transaction to use
|
||||||
|
// a shared buffer in its readonly check operations.
|
||||||
|
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||||
|
@ -355,6 +355,9 @@ type Config struct {
|
|||||||
// Currently all etcd memory gets mlocked, but in future the flag can
|
// Currently all etcd memory gets mlocked, but in future the flag can
|
||||||
// be refined to mlock in-use area of bbolt only.
|
// be refined to mlock in-use area of bbolt only.
|
||||||
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`
|
ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"`
|
||||||
|
|
||||||
|
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
|
||||||
|
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// configYAML holds the config suitable for yaml parsing
|
// configYAML holds the config suitable for yaml parsing
|
||||||
@ -444,8 +447,9 @@ func NewConfig() *Config {
|
|||||||
LogLevel: logutil.DefaultLogLevel,
|
LogLevel: logutil.DefaultLogLevel,
|
||||||
EnableGRPCGateway: true,
|
EnableGRPCGateway: true,
|
||||||
|
|
||||||
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
|
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
|
||||||
ExperimentalMemoryMlock: false,
|
ExperimentalMemoryMlock: false,
|
||||||
|
ExperimentalTxnModeWriteWithSharedBuffer: true,
|
||||||
}
|
}
|
||||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||||
return cfg
|
return cfg
|
||||||
|
@ -163,56 +163,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
|
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
|
||||||
|
|
||||||
srvcfg := config.ServerConfig{
|
srvcfg := config.ServerConfig{
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
ClientURLs: cfg.ACUrls,
|
ClientURLs: cfg.ACUrls,
|
||||||
PeerURLs: cfg.APUrls,
|
PeerURLs: cfg.APUrls,
|
||||||
DataDir: cfg.Dir,
|
DataDir: cfg.Dir,
|
||||||
DedicatedWALDir: cfg.WalDir,
|
DedicatedWALDir: cfg.WalDir,
|
||||||
SnapshotCount: cfg.SnapshotCount,
|
SnapshotCount: cfg.SnapshotCount,
|
||||||
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
||||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||||
MaxWALFiles: cfg.MaxWalFiles,
|
MaxWALFiles: cfg.MaxWalFiles,
|
||||||
InitialPeerURLsMap: urlsmap,
|
InitialPeerURLsMap: urlsmap,
|
||||||
InitialClusterToken: token,
|
InitialClusterToken: token,
|
||||||
DiscoveryURL: cfg.Durl,
|
DiscoveryURL: cfg.Durl,
|
||||||
DiscoveryProxy: cfg.Dproxy,
|
DiscoveryProxy: cfg.Dproxy,
|
||||||
NewCluster: cfg.IsNewCluster(),
|
NewCluster: cfg.IsNewCluster(),
|
||||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||||
TickMs: cfg.TickMs,
|
TickMs: cfg.TickMs,
|
||||||
ElectionTicks: cfg.ElectionTicks(),
|
ElectionTicks: cfg.ElectionTicks(),
|
||||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||||
AutoCompactionRetention: autoCompactionRetention,
|
AutoCompactionRetention: autoCompactionRetention,
|
||||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
BackendBatchLimit: cfg.BackendBatchLimit,
|
BackendBatchLimit: cfg.BackendBatchLimit,
|
||||||
BackendFreelistType: backendFreelistType,
|
BackendFreelistType: backendFreelistType,
|
||||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||||
MaxTxnOps: cfg.MaxTxnOps,
|
MaxTxnOps: cfg.MaxTxnOps,
|
||||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||||
SocketOpts: cfg.SocketOpts,
|
SocketOpts: cfg.SocketOpts,
|
||||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
BcryptCost: cfg.BcryptCost,
|
BcryptCost: cfg.BcryptCost,
|
||||||
TokenTTL: cfg.AuthTokenTTL,
|
TokenTTL: cfg.AuthTokenTTL,
|
||||||
CORS: cfg.CORS,
|
CORS: cfg.CORS,
|
||||||
HostWhitelist: cfg.HostWhitelist,
|
HostWhitelist: cfg.HostWhitelist,
|
||||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
Logger: cfg.logger,
|
Logger: cfg.logger,
|
||||||
LoggerConfig: cfg.loggerConfig,
|
LoggerConfig: cfg.loggerConfig,
|
||||||
LoggerCore: cfg.loggerCore,
|
LoggerCore: cfg.loggerCore,
|
||||||
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
||||||
ForceNewCluster: cfg.ForceNewCluster,
|
ForceNewCluster: cfg.ForceNewCluster,
|
||||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||||
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
|
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
|
||||||
|
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
|
||||||
}
|
}
|
||||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||||
|
@ -267,6 +267,7 @@ func newConfig() *config {
|
|||||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
|
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
|
||||||
|
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
|
||||||
|
|
||||||
// unsafe
|
// unsafe
|
||||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||||
|
@ -220,6 +220,8 @@ Experimental feature:
|
|||||||
Duration of periodical watch progress notification.
|
Duration of periodical watch progress notification.
|
||||||
--experimental-warning-apply-duration '100ms'
|
--experimental-warning-apply-duration '100ms'
|
||||||
Warning is generated if requests take more than this duration.
|
Warning is generated if requests take more than this duration.
|
||||||
|
--experimental-txn-mode-write-with-shared-buffer 'true'
|
||||||
|
Enable the write transaction to use a shared buffer in its readonly check operations.
|
||||||
|
|
||||||
Unsafe feature:
|
Unsafe feature:
|
||||||
--force-new-cluster 'false'
|
--force-new-cluster 'false'
|
||||||
|
@ -438,7 +438,7 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
|
|||||||
// When the transaction contains write operations, we use ReadTx instead of
|
// When the transaction contains write operations, we use ReadTx instead of
|
||||||
// ConcurrentReadTx to avoid extra overhead of copying buffer.
|
// ConcurrentReadTx to avoid extra overhead of copying buffer.
|
||||||
var txn mvcc.TxnWrite
|
var txn mvcc.TxnWrite
|
||||||
if isWrite {
|
if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer {
|
||||||
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
|
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
|
||||||
} else {
|
} else {
|
||||||
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
|
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
|
||||||
|
Reference in New Issue
Block a user