add key dedupe when a write buffer writeback to an empty read buffer bucket.
Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
@ -97,6 +97,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
|||||||
rb, ok := txr.buckets[k]
|
rb, ok := txr.buckets[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
delete(txw.buckets, k)
|
delete(txw.buckets, k)
|
||||||
|
if seq, ok := txw.bucket2seq[k]; ok && !seq {
|
||||||
|
wb.dedupe()
|
||||||
|
}
|
||||||
txr.buckets[k] = wb
|
txr.buckets[k] = wb
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -218,10 +221,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
|
|||||||
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
|
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
bb.dedupe()
|
||||||
|
}
|
||||||
|
|
||||||
|
// dedupe removes duplicates, using only newest update
|
||||||
|
func (bb *bucketBuffer) dedupe() {
|
||||||
|
if bb.used <= 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
sort.Stable(bb)
|
sort.Stable(bb)
|
||||||
|
|
||||||
// remove duplicates, using only newest update
|
|
||||||
widx := 0
|
widx := 0
|
||||||
for ridx := 1; ridx < bb.used; ridx++ {
|
for ridx := 1; ridx < bb.used; ridx++ {
|
||||||
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
|
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
|
||||||
|
@ -90,3 +90,53 @@ func Test_bucketBuffer_CopyUsed(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDedupe(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
keys, vals, expectedKeys, expectedVals []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty",
|
||||||
|
keys: []string{},
|
||||||
|
vals: []string{},
|
||||||
|
expectedKeys: []string{},
|
||||||
|
expectedVals: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single kv",
|
||||||
|
keys: []string{"key1"},
|
||||||
|
vals: []string{"val1"},
|
||||||
|
expectedKeys: []string{"key1"},
|
||||||
|
expectedVals: []string{"val1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "duplicate key",
|
||||||
|
keys: []string{"key1", "key1"},
|
||||||
|
vals: []string{"val1", "val2"},
|
||||||
|
expectedKeys: []string{"key1"},
|
||||||
|
expectedVals: []string{"val2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unordered keys",
|
||||||
|
keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"},
|
||||||
|
vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"},
|
||||||
|
expectedKeys: []string{"key1", "key2", "key3", "key4"},
|
||||||
|
expectedVals: []string{"val2", "val4", "val1", "val6"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
bb := &bucketBuffer{buf: make([]kv, 10), used: 0}
|
||||||
|
for i := 0; i < len(tt.keys); i++ {
|
||||||
|
bb.add([]byte(tt.keys[i]), []byte(tt.vals[i]))
|
||||||
|
}
|
||||||
|
bb.dedupe()
|
||||||
|
assert.Equal(t, bb.used, len(tt.expectedKeys))
|
||||||
|
for i := 0; i < bb.used; i++ {
|
||||||
|
assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i]))
|
||||||
|
assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i]))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user