server: set multiple concurrentReadTx instances share one txReadBuffer.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Wilson Wang
2021-05-24 15:15:23 -07:00
committed by Gyuho Lee
parent c0d1450b45
commit 7a7d6f94a7
2 changed files with 89 additions and 4 deletions

View File

@ -19,6 +19,8 @@ import (
"sort"
)
const bucketBufferInitialSize = 512
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
@ -69,10 +71,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}
// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
@ -94,6 +102,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
@ -114,7 +123,7 @@ type bucketBuffer struct {
}
func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {