storage/backend: add commitAndStop

After the upgrade of boltdb, db.Close waits for all txn to finish.
CommitAndStop commits the current txn and stop creating new ones.
This commit is contained in:
Xiang Li
2015-08-25 10:57:23 -07:00
parent 8738a88fae
commit e8f40b0412
2 changed files with 18 additions and 6 deletions

View File

@ -80,7 +80,7 @@ func (b *backend) run() {
select { select {
case <-time.After(b.batchInterval): case <-time.After(b.batchInterval):
case <-b.stopc: case <-b.stopc:
b.batchTx.Commit() b.batchTx.CommitAndStop()
return return
} }
b.batchTx.Commit() b.batchTx.Commit()

View File

@ -16,6 +16,7 @@ type BatchTx interface {
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeDelete(bucketName []byte, key []byte) UnsafeDelete(bucketName []byte, key []byte)
Commit() Commit()
CommitAndStop()
} }
type batchTx struct { type batchTx struct {
@ -43,7 +44,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
} }
t.pending++ t.pending++
if t.pending > t.backend.batchLimit { if t.pending > t.backend.batchLimit {
t.commit() t.commit(false)
t.pending = 0 t.pending = 0
} }
} }
@ -84,19 +85,26 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
} }
t.pending++ t.pending++
if t.pending > t.backend.batchLimit { if t.pending > t.backend.batchLimit {
t.commit() t.commit(false)
t.pending = 0 t.pending = 0
} }
} }
// commitAndBegin commits a previous tx and begins a new writable one. // Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() { func (t *batchTx) Commit() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
t.commit() t.commit(false)
} }
func (t *batchTx) commit() { // CommitAndStop commits the previous tx and do not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTx) commit(stop bool) {
var err error var err error
// commit the last tx // commit the last tx
if t.tx != nil { if t.tx != nil {
@ -106,6 +114,10 @@ func (t *batchTx) commit() {
} }
} }
if stop {
return
}
// begin a new tx // begin a new tx
t.tx, err = t.backend.db.Begin(true) t.tx, err = t.backend.db.Begin(true)
if err != nil { if err != nil {