*: support count in range query

This commit is contained in:
Xiang Li
2016-06-21 16:20:55 -07:00
parent c01c36bcfd
commit def21f11a9
8 changed files with 406 additions and 286 deletions

View File

@ -210,8 +210,7 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
resp.Header = &pb.ResponseHeader{}
var (
kvs []mvccpb.KeyValue
rev int64
rr *mvcc.RangeResult
err error
)
@ -229,13 +228,19 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
limit = limit + 1
}
ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}
if txnID != noTxn {
kvs, rev, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision)
rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
} else {
kvs, rev, err = a.s.KV().Range(r.Key, r.RangeEnd, limit, r.Revision)
rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro)
if err != nil {
return nil, err
}
@ -245,15 +250,15 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
var sorter sort.Interface
switch {
case r.SortTarget == pb.RangeRequest_KEY:
sorter = &kvSortByKey{&kvSort{kvs}}
sorter = &kvSortByKey{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VERSION:
sorter = &kvSortByVersion{&kvSort{kvs}}
sorter = &kvSortByVersion{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_CREATE:
sorter = &kvSortByCreate{&kvSort{kvs}}
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_MOD:
sorter = &kvSortByMod{&kvSort{kvs}}
sorter = &kvSortByMod{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VALUE:
sorter = &kvSortByValue{&kvSort{kvs}}
sorter = &kvSortByValue{&kvSort{rr.KVs}}
}
switch {
case r.SortOrder == pb.RangeRequest_ASCEND:
@ -263,17 +268,18 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
}
}
if r.Limit > 0 && len(kvs) > int(r.Limit) {
kvs = kvs[:r.Limit]
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
resp.Header.Revision = rev
for i := range kvs {
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
for i := range rr.KVs {
if r.KeysOnly {
kvs[i].Value = nil
rr.KVs[i].Value = nil
}
resp.Kvs = append(resp.Kvs, &kvs[i])
resp.Kvs = append(resp.Kvs, &rr.KVs[i])
}
return resp, nil
}
@ -337,7 +343,9 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// It returns the revision at which the comparison happens. If the comparison
// succeeds, the it returns true. Otherwise it returns false.
func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
ckvs, rev, err := a.s.KV().Range(c.Key, nil, 1, 0)
rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{})
rev := rr.Rev
if err != nil {
if err == mvcc.ErrTxnIDMismatch {
panic("unexpected txn ID mismatch error")
@ -345,8 +353,8 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
return rev, false
}
var ckv mvccpb.KeyValue
if len(ckvs) != 0 {
ckv = ckvs[0]
if len(rr.KVs) != 0 {
ckv = rr.KVs[0]
} else {
// Use the zero value of ckv normally. However...
if c.Target == pb.Compare_VALUE {
@ -443,7 +451,8 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com
return nil, ch, err
}
// get the current revision. which key to get is not important.
_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
resp.Header.Revision = rr.Rev
return resp, ch, err
}