namespace: a wrapper for clientv3 to namespace requests
This commit is contained in:
43
clientv3/namespace/doc.go
Normal file
43
clientv3/namespace/doc.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Package namespace is a clientv3 wrapper that translates all keys to begin
|
||||||
|
// with a given prefix.
|
||||||
|
//
|
||||||
|
// First, create a client:
|
||||||
|
//
|
||||||
|
// cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}})
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error!
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Next, override the client interfaces:
|
||||||
|
//
|
||||||
|
// unprefixedKV := cli.KV
|
||||||
|
// cli.KV = namespace.NewKV(cli.KV, "my-prefix/")
|
||||||
|
// cli.Watcher = namespace.NewWatcher(cli.Watcher, "my-prefix/")
|
||||||
|
// cli.Leases = namespace.NewLease(cli.Lease, "my-prefix/")
|
||||||
|
//
|
||||||
|
// Now calls using 'cli' will namespace / prefix all keys with "my-prefix/":
|
||||||
|
//
|
||||||
|
// cli.Put(context.TODO(), "abc", "123")
|
||||||
|
// resp, _ := unprefixedKV.Get(context.TODO(), "my-prefix/abc")
|
||||||
|
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||||
|
// // Output: 123
|
||||||
|
// unprefixedKV.Put(context.TODO(), "my-prefix/abc", "456")
|
||||||
|
// resp, _ = cli.Get("abc")
|
||||||
|
// fmt.Printf("%s\n", resp.Kvs[0].Value)
|
||||||
|
// // Output: 456
|
||||||
|
//
|
||||||
|
package namespace
|
189
clientv3/namespace/kv.go
Normal file
189
clientv3/namespace/kv.go
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type kvPrefix struct {
|
||||||
|
clientv3.KV
|
||||||
|
pfx string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKV wraps a KV instance so that all requests
|
||||||
|
// are prefixed with a given string.
|
||||||
|
func NewKV(kv clientv3.KV, prefix string) clientv3.KV {
|
||||||
|
return &kvPrefix{kv, prefix}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
|
||||||
|
if len(key) == 0 {
|
||||||
|
return nil, rpctypes.ErrEmptyKey
|
||||||
|
}
|
||||||
|
op := kv.prefixOp(clientv3.OpPut(key, val, opts...))
|
||||||
|
r, err := kv.KV.Do(ctx, op)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
put := r.Put()
|
||||||
|
kv.unprefixPutResponse(put)
|
||||||
|
return put, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||||
|
if len(key) == 0 {
|
||||||
|
return nil, rpctypes.ErrEmptyKey
|
||||||
|
}
|
||||||
|
r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGet(key, opts...)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
get := r.Get()
|
||||||
|
kv.unprefixGetResponse(get)
|
||||||
|
return get, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
|
||||||
|
if len(key) == 0 {
|
||||||
|
return nil, rpctypes.ErrEmptyKey
|
||||||
|
}
|
||||||
|
r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpDelete(key, opts...)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
del := r.Del()
|
||||||
|
kv.unprefixDeleteResponse(del)
|
||||||
|
return del, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
|
||||||
|
if len(op.KeyBytes()) == 0 {
|
||||||
|
return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
|
||||||
|
}
|
||||||
|
r, err := kv.KV.Do(ctx, kv.prefixOp(op))
|
||||||
|
if err != nil {
|
||||||
|
return r, err
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case r.Get() != nil:
|
||||||
|
kv.unprefixGetResponse(r.Get())
|
||||||
|
case r.Put() != nil:
|
||||||
|
kv.unprefixPutResponse(r.Put())
|
||||||
|
case r.Del() != nil:
|
||||||
|
kv.unprefixDeleteResponse(r.Del())
|
||||||
|
}
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type txnPrefix struct {
|
||||||
|
clientv3.Txn
|
||||||
|
kv *kvPrefix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn {
|
||||||
|
return &txnPrefix{kv.KV.Txn(ctx), kv}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
|
||||||
|
newCmps := make([]clientv3.Cmp, len(cs))
|
||||||
|
for i := range cs {
|
||||||
|
newCmps[i] = cs[i]
|
||||||
|
pfxKey, _ := txn.kv.prefixInterval(cs[i].KeyBytes(), nil)
|
||||||
|
newCmps[i].WithKeyBytes(pfxKey)
|
||||||
|
}
|
||||||
|
txn.Txn = txn.Txn.If(newCmps...)
|
||||||
|
return txn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
|
||||||
|
newOps := make([]clientv3.Op, len(ops))
|
||||||
|
for i := range ops {
|
||||||
|
newOps[i] = txn.kv.prefixOp(ops[i])
|
||||||
|
}
|
||||||
|
txn.Txn = txn.Txn.Then(newOps...)
|
||||||
|
return txn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
|
||||||
|
newOps := make([]clientv3.Op, len(ops))
|
||||||
|
for i := range ops {
|
||||||
|
newOps[i] = txn.kv.prefixOp(ops[i])
|
||||||
|
}
|
||||||
|
txn.Txn = txn.Txn.Else(newOps...)
|
||||||
|
return txn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) {
|
||||||
|
resp, err := txn.Txn.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
txn.kv.unprefixTxnResponse(resp)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
|
||||||
|
begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
|
||||||
|
op.WithKeyBytes(begin)
|
||||||
|
op.WithRangeBytes(end)
|
||||||
|
return op
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
|
||||||
|
for i := range resp.Kvs {
|
||||||
|
resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) {
|
||||||
|
if resp.PrevKv != nil {
|
||||||
|
resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) unprefixDeleteResponse(resp *clientv3.DeleteResponse) {
|
||||||
|
for i := range resp.PrevKvs {
|
||||||
|
resp.PrevKvs[i].Key = resp.PrevKvs[i].Key[len(kv.pfx):]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
|
||||||
|
for _, r := range resp.Responses {
|
||||||
|
switch tv := r.Response.(type) {
|
||||||
|
case *pb.ResponseOp_ResponseRange:
|
||||||
|
if tv.ResponseRange != nil {
|
||||||
|
kv.unprefixGetResponse((*clientv3.GetResponse)(tv.ResponseRange))
|
||||||
|
}
|
||||||
|
case *pb.ResponseOp_ResponsePut:
|
||||||
|
if tv.ResponsePut != nil {
|
||||||
|
kv.unprefixPutResponse((*clientv3.PutResponse)(tv.ResponsePut))
|
||||||
|
}
|
||||||
|
case *pb.ResponseOp_ResponseDeleteRange:
|
||||||
|
if tv.ResponseDeleteRange != nil {
|
||||||
|
kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
|
||||||
|
return prefixInterval(p.pfx, key, end)
|
||||||
|
}
|
58
clientv3/namespace/lease.go
Normal file
58
clientv3/namespace/lease.go
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type leasePrefix struct {
|
||||||
|
clientv3.Lease
|
||||||
|
pfx []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLease wraps a Lease interface to filter for only keys with a prefix
|
||||||
|
// and remove that prefix when fetching attached keys through TimeToLive.
|
||||||
|
func NewLease(l clientv3.Lease, prefix string) clientv3.Lease {
|
||||||
|
return &leasePrefix{l, []byte(prefix)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *leasePrefix) TimeToLive(ctx context.Context, id clientv3.LeaseID, opts ...clientv3.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) {
|
||||||
|
resp, err := l.Lease.TimeToLive(ctx, id, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(resp.Keys) > 0 {
|
||||||
|
var outKeys [][]byte
|
||||||
|
for i := range resp.Keys {
|
||||||
|
if len(resp.Keys[i]) < len(l.pfx) {
|
||||||
|
// too short
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !bytes.Equal(resp.Keys[i][:len(l.pfx)], l.pfx) {
|
||||||
|
// doesn't match prefix
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// strip prefix
|
||||||
|
outKeys = append(outKeys, resp.Keys[i][len(l.pfx):])
|
||||||
|
}
|
||||||
|
resp.Keys = outKeys
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
42
clientv3/namespace/util.go
Normal file
42
clientv3/namespace/util.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package namespace
|
||||||
|
|
||||||
|
func prefixInterval(pfx string, key, end []byte) (pfxKey []byte, pfxEnd []byte) {
|
||||||
|
pfxKey = make([]byte, len(pfx)+len(key))
|
||||||
|
copy(pfxKey[copy(pfxKey, pfx):], key)
|
||||||
|
|
||||||
|
if len(end) == 1 && end[0] == 0 {
|
||||||
|
// the edge of the keyspace
|
||||||
|
pfxEnd = make([]byte, len(pfx))
|
||||||
|
copy(pfxEnd, pfx)
|
||||||
|
ok := false
|
||||||
|
for i := len(pfxEnd) - 1; i >= 0; i-- {
|
||||||
|
if pfxEnd[i]++; pfxEnd[i] != 0 {
|
||||||
|
ok = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
// 0xff..ff => 0x00
|
||||||
|
pfxEnd = []byte{0}
|
||||||
|
}
|
||||||
|
} else if len(end) >= 1 {
|
||||||
|
pfxEnd = make([]byte, len(pfx)+len(end))
|
||||||
|
copy(pfxEnd[copy(pfxEnd, pfx):], end)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pfxKey, pfxEnd
|
||||||
|
}
|
75
clientv3/namespace/util_test.go
Normal file
75
clientv3/namespace/util_test.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrefixInterval(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
pfx string
|
||||||
|
key []byte
|
||||||
|
end []byte
|
||||||
|
|
||||||
|
wKey []byte
|
||||||
|
wEnd []byte
|
||||||
|
}{
|
||||||
|
// single key
|
||||||
|
{
|
||||||
|
pfx: "pfx/",
|
||||||
|
key: []byte("a"),
|
||||||
|
|
||||||
|
wKey: []byte("pfx/a"),
|
||||||
|
},
|
||||||
|
// range
|
||||||
|
{
|
||||||
|
pfx: "pfx/",
|
||||||
|
key: []byte("abc"),
|
||||||
|
end: []byte("def"),
|
||||||
|
|
||||||
|
wKey: []byte("pfx/abc"),
|
||||||
|
wEnd: []byte("pfx/def"),
|
||||||
|
},
|
||||||
|
// one-sided range
|
||||||
|
{
|
||||||
|
pfx: "pfx/",
|
||||||
|
key: []byte("abc"),
|
||||||
|
end: []byte{0},
|
||||||
|
|
||||||
|
wKey: []byte("pfx/abc"),
|
||||||
|
wEnd: []byte("pfx0"),
|
||||||
|
},
|
||||||
|
// one-sided range, end of keyspace
|
||||||
|
{
|
||||||
|
pfx: "\xff\xff",
|
||||||
|
key: []byte("abc"),
|
||||||
|
end: []byte{0},
|
||||||
|
|
||||||
|
wKey: []byte("\xff\xffabc"),
|
||||||
|
wEnd: []byte{0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
pfxKey, pfxEnd := prefixInterval(tt.pfx, tt.key, tt.end)
|
||||||
|
if !bytes.Equal(pfxKey, tt.wKey) {
|
||||||
|
t.Errorf("#%d: expected key=%q, got key=%q", i, tt.wKey, pfxKey)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(pfxEnd, tt.wEnd) {
|
||||||
|
t.Errorf("#%d: expected end=%q, got end=%q", i, tt.wEnd, pfxEnd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
84
clientv3/namespace/watch.go
Normal file
84
clientv3/namespace/watch.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type watcherPrefix struct {
|
||||||
|
clientv3.Watcher
|
||||||
|
pfx string
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
stopc chan struct{}
|
||||||
|
stopOnce sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWatcher wraps a Watcher instance so that all Watch requests
|
||||||
|
// are prefixed with a given string and all Watch responses have
|
||||||
|
// the prefix removed.
|
||||||
|
func NewWatcher(w clientv3.Watcher, prefix string) clientv3.Watcher {
|
||||||
|
return &watcherPrefix{Watcher: w, pfx: prefix, stopc: make(chan struct{})}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcherPrefix) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
|
||||||
|
// since OpOption is opaque, determine range for prefixing through an OpGet
|
||||||
|
op := clientv3.OpGet("abc", opts...)
|
||||||
|
end := op.RangeBytes()
|
||||||
|
pfxBegin, pfxEnd := prefixInterval(w.pfx, []byte(key), end)
|
||||||
|
if pfxEnd != nil {
|
||||||
|
opts = append(opts, clientv3.WithRange(string(pfxEnd)))
|
||||||
|
}
|
||||||
|
|
||||||
|
wch := w.Watcher.Watch(ctx, string(pfxBegin), opts...)
|
||||||
|
|
||||||
|
// translate watch events from prefixed to unprefixed
|
||||||
|
pfxWch := make(chan clientv3.WatchResponse)
|
||||||
|
w.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
close(pfxWch)
|
||||||
|
w.wg.Done()
|
||||||
|
}()
|
||||||
|
for wr := range wch {
|
||||||
|
for i := range wr.Events {
|
||||||
|
wr.Events[i].Kv.Key = wr.Events[i].Kv.Key[len(w.pfx):]
|
||||||
|
if wr.Events[i].PrevKv != nil {
|
||||||
|
wr.Events[i].PrevKv.Key = wr.Events[i].Kv.Key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case pfxWch <- wr:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-w.stopc:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return pfxWch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcherPrefix) Close() error {
|
||||||
|
err := w.Watcher.Close()
|
||||||
|
w.stopOnce.Do(func() { close(w.stopc) })
|
||||||
|
w.wg.Wait()
|
||||||
|
return err
|
||||||
|
}
|
Reference in New Issue
Block a user