Compare commits

...

57 Commits

Author SHA1 Message Date
8a37349097 version: bump to v3.0.14 2016-11-04 10:54:14 -07:00
9a0e4dfe4f ctlv3: fix migration 2016-11-03 09:47:41 -07:00
f60469af16 ctlv3: Add a no-ttl flag to etcdctl migrate to discard keys on transform. 2016-11-03 09:47:39 -07:00
932370d8ca version: bump to v3.0.13+git 2016-10-24 11:22:50 -07:00
c99d0d4b25 version: bump to v3.0.13 2016-10-24 11:04:43 -07:00
d78216f528 e2e: remove 'ctlV3GetFailPerm' 2016-10-24 11:04:13 -07:00
c05c027a24 etcdctl: fix migrate in outputing client.Node to json
Using printf will try to parse the string and replace special
characters. In migrate code, we want to just output the raw
json string of client.Node.
For example,
    Printf("%\\") => %!\(MISSING)
    Print("%\\") => %\
Thus, we should use print instead.
2016-10-20 10:51:16 -07:00
3fd64f913a auth: fix return type on 'hasRootRole' 2016-10-12 13:59:27 -07:00
f935290bbc mvcc: fix rev inconsistency
Try:

./etcdctl put foo bar
./etcdctl del foo
./etcdctl compact 3

restart etcd

./etcdctl get foo
mvcc: required revision has been compacted

The error is unexpected when range over the head revision.

Internally, we incorrectly set current revision smaller than the
compacted revision when we remove all keys around compacted revision.

This commit fixes the issue by recovering the current revision at least
to compacted revision.
2016-10-12 13:08:26 -07:00
ca91f898a2 auth, e2e, clientv3: the root role should be granted access to every key
This commit changes the semantics of the root role. The role should be
able to access to every key.

Partially fixes https://github.com/coreos/etcd/issues/6355
2016-10-11 12:19:46 -07:00
fcbada7798 Merge pull request #6622 from luxas/backport_arm_fixes
Backport arm fixes
2016-10-11 12:15:58 -07:00
fad9bdc3e1 etcdserver: atomic access alignment
Most fields accessed with sync/atomic functions are 64bit aligned, but a couple
are not.  This makes comments out of date and therefore misleading.

Affected fields reordered, comments scrubbed and updated.
2016-10-11 11:48:43 +03:00
198ccb8b7b raftpb: atomic access alignment
The Entry struct has misaligned fields that are accessed atomically.  The
misalignment is caused by the EntryType enum which the Protocol Buffers
spec forces to be a 32bit int.

Moving the order of the fields without renumbering them in the .proto file
seems to align the go structure without changing the wire format.
2016-10-11 11:48:43 +03:00
dc5d5c6ac8 raft: atomic access alignment
The relevant structures are properly aligned, however, there is no comment
highlighting the need to keep it aligned as is present elsewhere in the
codebase.

Adding note to keep alignment, in line with similar comments in the codebase.
2016-10-11 11:48:43 +03:00
f771eaca47 version: bump to v3.0.12+git 2016-10-07 16:42:12 -07:00
2d1e2e8e64 version: bump to v3.0.12 2016-10-07 15:14:25 -07:00
6412758177 v3rpc: remove redundant locks 2016-10-07 15:13:56 -07:00
836c8159f6 v3rpc: lock progress and prevKV map correctly 2016-10-07 15:13:12 -07:00
e406e6e8f4 etcdctl/ctlv3: add 'prev-kv' flag to watch command 2016-10-07 14:23:09 -07:00
2fa2c6284e clientv3: add 'prevKV' field to watch request 2016-10-07 14:22:58 -07:00
2862c4fa12 v3rpc: implement 'prev-kv' watch 2016-10-07 14:22:19 -07:00
6f89fbf8b5 etcdserver: use mvcc.WatchableKV for prev-kv watch 2016-10-07 14:22:00 -07:00
6ae7ec9a3f *: regenerate proto 2016-10-07 14:21:19 -07:00
4a35b1b20a etcdserverpb: add 'prev_kb' to WatchCreateRequest 2016-10-07 14:20:46 -07:00
c859c97ee2 mvccpb: add 'prev_kv' field 2016-10-07 14:19:59 -07:00
a091c629e1 version: bump to v3.0.11+git 2016-10-07 13:25:21 -07:00
96de94a584 version: bump to v3.0.11 2016-10-07 11:27:48 -07:00
e9cd8410d7 integration: add 'prevKV' to TestV3DeleteRange 2016-10-07 11:03:19 -07:00
e37ede1d2e etcdserver: handle 'PrevKV' 2016-10-07 11:00:48 -07:00
4420a29ac4 etcdctl/ctlv3: add 'prev-kv' flag 2016-10-07 10:56:06 -07:00
0544d4bfd0 clientv3: add WithPrevKV OpOption 2016-10-07 10:54:45 -07:00
fe7379f102 clientv3: add Op.prevKV 2016-10-07 10:51:01 -07:00
c76df5052b *: update proto to add 'prev_kv' 2016-10-07 10:47:47 -07:00
3299cad1c3 *: add put prevkv 2016-10-07 10:39:08 -07:00
d9ab018c49 integration: test a canceled watch won't return a closing error 2016-10-05 14:19:36 -07:00
e853451cd2 clientv3: only return closing error to watcher if context is not canceled
Fixes #6503
2016-10-05 14:19:32 -07:00
1becf9d2f5 clientv3: fix race on watch initial revision
The initial revision was being updated in the substream goroutine defer;
this was racing with the resume path fetching the initial revision when
the substream closes during resume. Instead, update the initial revision
whenever the substream processes a new watch response. Since the substream
cannot receive a watch response while it is resuming, the write to the
initial revision is ordered to always happen after the resume read.

Fixes #6586
2016-10-05 10:56:36 -07:00
1a712cf187 clientv3: make IsProgressNotify() false on compact event and closed channel
Fixes #6549
2016-10-04 15:13:02 -07:00
023f335f67 wal: set PageWriter offset in file encoder 2016-10-04 15:12:47 -07:00
bf0da78b63 pkg/ioutil: configure pageOffset in NewPageWriter 2016-10-04 15:12:46 -07:00
e8473850a2 integration: test canceling watchers when disconnected 2016-10-04 15:12:37 -07:00
b836d187fd clientv3: simplify watch synchronization
Was more complicated than it needed to be and didn't really work in the
first place. Restructured watcher registation to use a queue.
2016-10-04 15:12:18 -07:00
9b09229c4d version: bump to v3.0.10+git 2016-09-23 11:13:45 -07:00
546c0f7ed6 version: bump to v3.0.10 2016-09-23 10:49:03 -07:00
adbad1c9b5 ctlv3: close snapshot file before rename (Windows) 2016-09-23 09:11:02 -07:00
273b986751 clientv3: process closed watcherStreams in watcherGrpcStream run loop
Was racing with Watch() when closing the grpc stream on no watchers.

Fixes #6476
2016-09-21 15:52:20 -07:00
5b205729b9 rafthttp: add v3.0.0 to supported streams 2016-09-16 21:54:55 +09:00
fe900b09dd version: bump to v3.0.9+git 2016-09-15 15:10:23 -07:00
494c012659 version: bump to v3.0.9 2016-09-15 12:56:33 -07:00
4abc381ebe clientv3: drain buffered WatchResponses before resuming
Otherwise, the watcherStream can receive WatchResponses in the
middle of a resume, corrupting the stream.

Fixes #6364
2016-09-15 12:38:15 -07:00
73c8fdac53 integration: fix compilation for backported Election test 2016-09-15 11:45:37 -07:00
ee2717493a ctlv3: fix line parsing for Windows 2016-09-15 11:25:53 -07:00
2435eb9ecd clientv3: balancer panics when call up after close
Fix the issue by adding a simple guard varable.
2016-09-15 18:46:26 +09:00
8fb533dabe embed: warn on domain name in listener 2016-09-15 18:46:19 +09:00
2f0f5ac504 Revert "Merge pull request #6365 from heyitsanthony/fix-dns-bind"
This reverts commit af5ab7b351, reversing
changes made to da6a0f0594.
2016-09-15 18:43:46 +09:00
9ab811d478 auth: fix range handling bugs.
Test 15, counting from zero, in TestGetMergedPerms
in etcd/auth/range_perm_cache_test.go, was trying
incorrectly assert that [a, b) merged with [b, "")
should be [a, b). Added a test specifically for
this. This patch fixes the incorrect larger test
and the bugs in the code that it was hiding.

Fixes #6359
2016-09-15 18:41:56 +09:00
e0a99fb4ba version: bump to v3.0.8+git 2016-09-09 15:56:31 -07:00
52 changed files with 1483 additions and 766 deletions

View File

@ -427,6 +427,7 @@ Empty field.
| ----- | ----------- | ---- |
| key | key is the first key to delete in the range. | bytes |
| range_end | range_end is the key following the last key to delete for the range [key, range_end). If range_end is not given, the range is defined to contain only the key argument. If range_end is '\0', the range is all keys greater than or equal to the key argument. | bytes |
| prev_kv | If prev_kv is set, etcd gets the previous key-value pairs before deleting it. The previous key-value pairs will be returned in the delte response. | bool |
@ -436,6 +437,7 @@ Empty field.
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| deleted | deleted is the number of keys deleted by the delete range request. | int64 |
| prev_kvs | if prev_kv is set in the request, the previous key-value pairs will be returned. | (slice of) mvccpb.KeyValue |
@ -591,6 +593,7 @@ Empty field.
| key | key is the key, in bytes, to put into the key-value store. | bytes |
| value | value is the value, in bytes, to associate with the key in the key-value store. | bytes |
| lease | lease is the lease ID to associate with the key in the key-value store. A lease value of 0 indicates no lease. | int64 |
| prev_kv | If prev_kv is set, etcd gets the previous key-value pair before changing it. The previous key-value pair will be returned in the put response. | bool |
@ -599,6 +602,7 @@ Empty field.
| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| prev_kv | if prev_kv is set in the request, the previous key-value pair will be returned. | mvccpb.KeyValue |
@ -735,6 +739,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| range_end | range_end is the end of the range [key, range_end) to watch. If range_end is not given, only the key argument is watched. If range_end is equal to '\0', all keys greater than or equal to the key argument are watched. | bytes |
| start_revision | start_revision is an optional revision to watch from (inclusive). No start_revision is "now". | int64 |
| progress_notify | progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events. It is useful when clients wish to recover a disconnected watcher starting from a recent known revision. The etcd server may decide how often it will send notifications based on current load. | bool |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
@ -767,6 +772,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| ----- | ----------- | ---- |
| type | type is the kind of event. If type is a PUT, it indicates new data has been stored to the key. If type is a DELETE, it indicates the key was deleted. | EventType |
| kv | kv holds the KeyValue for the event. A PUT event contains current kv pair. A PUT event with kv.Version=1 indicates the creation of a key. A DELETE/EXPIRE event contains the deleted key with its modification revision set to the revision of deletion. | KeyValue |
| prev_kv | prev_kv holds the key-value pair before the event happens. | KeyValue |

View File

@ -1474,6 +1474,11 @@
"format": "byte",
"description": "key is the first key to delete in the range."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, etcd gets the previous key-value pairs before deleting it.\nThe previous key-value pairs will be returned in the delte response."
},
"range_end": {
"type": "string",
"format": "byte",
@ -1491,6 +1496,13 @@
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kvs": {
"type": "array",
"items": {
"$ref": "#/definitions/mvccpbKeyValue"
},
"description": "if prev_kv is set in the request, the previous key-value pairs will be returned."
}
}
},
@ -1724,6 +1736,11 @@
"format": "int64",
"description": "lease is the lease ID to associate with the key in the key-value store. A lease\nvalue of 0 indicates no lease."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, etcd gets the previous key-value pair before changing it.\nThe previous key-value pair will be returned in the put response."
},
"value": {
"type": "string",
"format": "byte",
@ -1736,6 +1753,10 @@
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "if prev_kv is set in the request, the previous key-value pair will be returned."
}
}
},
@ -1988,6 +2009,11 @@
"format": "byte",
"description": "key is the key to register for watching."
},
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, created watcher gets the previous KV before the event happens.\nIf the previous KV is already compacted, nothing will be returned."
},
"progress_notify": {
"type": "boolean",
"format": "boolean",
@ -2057,6 +2083,10 @@
"$ref": "#/definitions/mvccpbKeyValue",
"description": "kv holds the KeyValue for the event.\nA PUT event contains current kv pair.\nA PUT event with kv.Version=1 indicates the creation of a key.\nA DELETE/EXPIRE event contains the deleted key with\nits modification revision set to the revision of deletion."
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "prev_kv holds the key-value pair before the event happens."
},
"type": {
"$ref": "#/definitions/EventEventType",
"description": "type is the kind of event. If type is a PUT, it indicates\nnew data has been stored to the key. If type is a DELETE,\nit indicates the key was deleted."

View File

@ -21,9 +21,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -22,7 +22,10 @@ import (
"github.com/coreos/etcd/mvcc/backend"
)
// isSubset returns true if a is a subset of b
// isSubset returns true if a is a subset of b.
// If a is a prefix of b, then a is a subset of b.
// Given intervals [a1,a2) and [b1,b2), is
// the a interval a subset of b?
func isSubset(a, b *rangePerm) bool {
switch {
case len(a.end) == 0 && len(b.end) == 0:
@ -32,9 +35,11 @@ func isSubset(a, b *rangePerm) bool {
// b is a key, a is a range
return false
case len(a.end) == 0:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.begin, b.end) <= 0
// a is a key, b is a range. need b1 <= a1 and a1 < b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.begin, b.end) < 0
default:
return 0 <= bytes.Compare(a.begin, b.begin) && bytes.Compare(a.end, b.end) <= 0
// both are ranges. need b1 <= a1 and a2 <= b2
return bytes.Compare(b.begin, a.begin) <= 0 && bytes.Compare(a.end, b.end) <= 0
}
}
@ -88,12 +93,18 @@ func mergeRangePerms(perms []*rangePerm) []*rangePerm {
i := 0
for i < len(perms) {
begin, next := i, i
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) != -1 {
for next+1 < len(perms) && bytes.Compare(perms[next].end, perms[next+1].begin) >= 0 {
next++
}
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
// don't merge ["a", "b") with ["b", ""), because perms[next+1].end is empty.
if next != begin && len(perms[next].end) > 0 {
merged = append(merged, &rangePerm{begin: perms[begin].begin, end: perms[next].end})
} else {
merged = append(merged, perms[begin])
if next != begin {
merged = append(merged, perms[next])
}
}
i = next + 1
}

View File

@ -46,6 +46,10 @@ func TestGetMergedPerms(t *testing.T) {
[]*rangePerm{{[]byte("a"), []byte("b")}},
[]*rangePerm{{[]byte("a"), []byte("b")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("")}},
},
{
[]*rangePerm{{[]byte("a"), []byte("b")}, {[]byte("b"), []byte("c")}},
[]*rangePerm{{[]byte("a"), []byte("c")}},
@ -106,7 +110,7 @@ func TestGetMergedPerms(t *testing.T) {
},
{
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("b"), []byte("")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("d"), []byte("")}},
[]*rangePerm{{[]byte("a"), []byte("")}, {[]byte("b"), []byte("c")}, {[]byte("c"), []byte("")}, {[]byte("d"), []byte("")}},
},
// duplicate ranges
{

View File

@ -603,6 +603,11 @@ func (as *authStore) isOpPermitted(userName string, key, rangeEnd []byte, permTy
return false
}
// root role should have permission on all ranges
if hasRootRole(user) {
return true
}
if as.isRangeOpPermitted(tx, userName, key, rangeEnd, permTyp) {
return true
}

View File

@ -45,6 +45,8 @@ type simpleBalancer struct {
// pinAddr is the currently pinned address; set to the empty string on
// intialization and shutdown.
pinAddr string
closed bool
}
func newSimpleBalancer(eps []string) *simpleBalancer {
@ -74,15 +76,25 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
// gRPC might call Up after it called Close. We add this check
// to "fix" it up at application layer. Or our simplerBalancer
// might panic since b.upc is closed.
if b.closed {
return func(err error) {}
}
if len(b.upEps) == 0 {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.pinAddr = addr.Addr
}
b.upEps[addr.Addr] = struct{}{}
b.mu.Unlock()
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
delete(b.upEps, addr.Addr)
@ -128,13 +140,19 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
// In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice.
if b.closed {
return nil
}
b.closed = true
close(b.notifyCh)
// terminate all waiting Get()s
b.pinAddr = ""
if len(b.upEps) == 0 {
close(b.upc)
}
b.mu.Unlock()
return nil
}

View File

@ -32,35 +32,63 @@ func ExampleAuth() {
}
defer cli.Close()
authapi := clientv3.NewAuth(cli)
if _, err = authapi.RoleAdd(context.TODO(), "root"); err != nil {
if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil {
log.Fatal(err)
}
if _, err = authapi.RoleGrantPermission(
if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil {
log.Fatal(err)
}
if _, err = cli.RoleGrantPermission(
context.TODO(),
"root", // role name
"foo", // key
"zoo", // range end
"r", // role name
"foo", // key
"zoo", // range end
clientv3.PermissionType(clientv3.PermReadWrite),
); err != nil {
log.Fatal(err)
}
if _, err = authapi.UserAdd(context.TODO(), "root", "123"); err != nil {
if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil {
log.Fatal(err)
}
if _, err = authapi.UserGrantRole(context.TODO(), "root", "root"); err != nil {
if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil {
log.Fatal(err)
}
if _, err = authapi.AuthEnable(context.TODO()); err != nil {
if _, err = cli.AuthEnable(context.TODO()); err != nil {
log.Fatal(err)
}
cliAuth, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
Username: "u",
Password: "123",
})
if err != nil {
log.Fatal(err)
}
defer cliAuth.Close()
if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil {
log.Fatal(err)
}
_, err = cliAuth.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
Then(clientv3.OpPut("zoo1", "XYZ")).
Else(clientv3.OpPut("zoo1", "ABC")).
Commit()
fmt.Println(err)
// now check the permission with the root account
rootCli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
Username: "root",
@ -69,31 +97,17 @@ func ExampleAuth() {
if err != nil {
log.Fatal(err)
}
defer cliAuth.Close()
defer rootCli.Close()
kv := clientv3.NewKV(cliAuth)
if _, err = kv.Put(context.TODO(), "foo1", "bar"); err != nil {
log.Fatal(err)
}
_, err = kv.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
Then(clientv3.OpPut("zoo1", "XYZ")).
Else(clientv3.OpPut("zoo1", "ABC")).
Commit()
fmt.Println(err)
// now check the permission
authapi2 := clientv3.NewAuth(cliAuth)
resp, err := authapi2.RoleGet(context.TODO(), "root")
resp, err := rootCli.RoleGet(context.TODO(), "r")
if err != nil {
log.Fatal(err)
}
fmt.Printf("root user permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
if _, err = authapi2.AuthDisable(context.TODO()); err != nil {
if _, err = rootCli.AuthDisable(context.TODO()); err != nil {
log.Fatal(err)
}
// Output: etcdserver: permission denied
// root user permission: key "foo", range end "zoo"
// user u permission: key "foo", range end "zoo"
}

View File

@ -673,3 +673,112 @@ func TestWatchWithRequireLeader(t *testing.T) {
t.Fatalf("expected response, got closed channel")
}
}
// TestWatchOverlapContextCancel stresses the watcher stream teardown path by
// creating/canceling watchers to ensure that new watchers are not taken down
// by a torn down watch stream. The sort of race that's being detected:
// 1. create w1 using a cancelable ctx with %v as "ctx"
// 2. cancel ctx
// 3. watcher client begins tearing down watcher grpc stream since no more watchers
// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
// 4. watcher client finishes tearing down stream on "ctx"
// 5. w2 comes back canceled
func TestWatchOverlapContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {}
testWatchOverlapContextCancel(t, f)
}
func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections()
}
testWatchOverlapContextCancel(t, f)
}
func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
// each unique context "%v" has a unique grpc stream
n := 100
ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
for i := range ctxs {
// make "%v" unique
ctxs[i] = context.WithValue(context.TODO(), "key", i)
// limits the maximum number of outstanding watchers per stream
ctxc[i] = make(chan struct{}, 2)
}
// issue concurrent watches on "abc" with cancel
cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}
ch := make(chan struct{}, n)
for i := 0; i < n; i++ {
go func() {
defer func() { ch <- struct{}{} }()
idx := rand.Intn(len(ctxs))
ctx, cancel := context.WithCancel(ctxs[idx])
ctxc[idx] <- struct{}{}
wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
f(clus)
select {
case _, ok := <-wch:
if !ok {
t.Fatalf("unexpected closed channel %p", wch)
}
// may take a second or two to reestablish a watcher because of
// grpc backoff policies for disconnects
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for watch on %p", wch)
}
// randomize how cancel overlaps with watch creation
if rand.Intn(2) == 0 {
<-ctxc[idx]
cancel()
} else {
cancel()
<-ctxc[idx]
}
}()
}
// join on watches
for i := 0; i < n; i++ {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for completed watch")
}
}
}
// TestWatchCanelAndCloseClient ensures that canceling a watcher then immediately
// closing the client does not return a client closing error.
func TestWatchCancelAndCloseClient(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
ctx, cancel := context.WithCancel(context.Background())
wch := cli.Watch(ctx, "abc")
donec := make(chan struct{})
go func() {
defer close(donec)
select {
case wr, ok := <-wch:
if ok {
t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for closed channel")
}
}()
cancel()
if err := cli.Close(); err != nil {
t.Fatal(err)
}
<-donec
clus.TakeClient(0)
}

View File

@ -157,14 +157,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil

View File

@ -47,6 +47,9 @@ type Op struct {
// for range, watch
rev int64
// for watch, put, delete
prevKV bool
// progressNotify is for progress updates.
progressNotify bool
@ -73,10 +76,10 @@ func (op Op) toRequestOp() *pb.RequestOp {
}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
@ -271,3 +274,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true
}
}
// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
// nothing will be returned.
func WithPrevKV() OpOption {
return func(op *Op) {
op.prevKV = true
}
}

View File

@ -61,6 +61,9 @@ type WatchResponse struct {
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// created is used to indicate the creation of the watcher.
created bool
closeErr error
}
@ -89,7 +92,7 @@ func (wr *WatchResponse) Err() error {
// IsProgressNotify returns true if the WatchResponse is progress notification.
func (wr *WatchResponse) IsProgressNotify() bool {
return len(wr.Events) == 0 && !wr.Canceled
return len(wr.Events) == 0 && !wr.Canceled && !wr.created && wr.CompactRevision == 0 && wr.Header.Revision != 0
}
// watcher implements the Watcher interface
@ -102,6 +105,7 @@ type watcher struct {
streams map[string]*watchGrpcStream
}
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
@ -112,10 +116,10 @@ type watchGrpcStream struct {
ctxKey string
cancel context.CancelFunc
// mu protects the streams map
mu sync.RWMutex
// streams holds all active watchers
streams map[int64]*watcherStream
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
@ -127,8 +131,12 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// the error that closed the watch stream
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
@ -140,6 +148,8 @@ type watchRequest struct {
rev int64
// progressNotify is for progress updates.
progressNotify bool
// get the previous key-value pair before the event happens
prevKV bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
@ -150,15 +160,18 @@ type watcherStream struct {
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan<- WatchResponse
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
id int64
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// lastRev is revision last successfully sent over outc
lastRev int64
// resumec indicates the stream must recover at a given revision
resumec chan int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
func NewWatcher(c *Client) Watcher {
@ -182,18 +195,20 @@ func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
streams: make(map[int64]*watcherStream),
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
go wgs.run()
return wgs
@ -203,14 +218,14 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{
ctx: ctx,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
retc: retc,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
@ -242,7 +257,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
@ -255,7 +269,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
// receive channel
if ok {
select {
case ret := <-retc:
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
@ -286,12 +300,7 @@ func (w *watcher) Close() (err error) {
}
func (w *watchGrpcStream) Close() (err error) {
w.mu.Lock()
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
close(w.stopc)
<-w.donec
select {
case err = <-w.errc:
@ -300,67 +309,57 @@ func (w *watchGrpcStream) Close() (err error) {
return toErr(w.ctx, err)
}
func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore
return
}
if resp.Canceled || resp.CompactRevision != 0 {
// a cancel at id creation time means the start revision has
// been compacted out of the store
ret := make(chan WatchResponse, 1)
ret <- WatchResponse{
Header: *resp.Header,
CompactRevision: resp.CompactRevision,
Canceled: true}
close(ret)
pendingReq.retc <- ret
return
}
ret := make(chan WatchResponse)
if resp.WatchId == -1 {
// failed; no channel
close(ret)
pendingReq.retc <- ret
return
}
ws := &watcherStream{
initReq: *pendingReq,
id: resp.WatchId,
outc: ret,
// buffered so unlikely to block on sending while holding mu
recvc: make(chan *WatchResponse, 4),
resumec: make(chan int64),
}
if pendingReq.rev == 0 {
// note the header revision so that a put following a current watcher
// disconnect will arrive on the watcher channel after reconnect
ws.initReq.rev = resp.Header.Revision
}
func (w *watcher) closeStream(wgs *watchGrpcStream) {
w.mu.Lock()
w.streams[ws.id] = ws
close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
}
w.mu.Unlock()
// pass back the subscriber channel for the watcher
pendingReq.retc <- ret
// send messages to subscriber
go w.serveStream(ws)
}
// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
if resp.WatchId == -1 {
// failed; no channel
close(ws.recvc)
return
}
ws.id = resp.WatchId
w.substreams[ws.id] = ws
}
func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
select {
case ws.outc <- *resp:
case <-ws.initReq.ctx.Done():
case <-time.After(closeSendErrTimeout):
}
close(ws.outc)
delete(w.streams, ws.id)
w.mu.Unlock()
}
func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
// send channel response in case stream was never established
select {
case ws.initReq.retc <- ws.outc:
default:
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
} else {
close(ws.outc)
}
if ws.id != -1 {
delete(w.substreams, ws.id)
return
}
for i := range w.resuming {
if w.resuming[i] == ws {
w.resuming[i] = nil
return
}
}
}
// run is the root of the goroutines for managing a watcher client
@ -368,66 +367,79 @@ func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
defer func() {
w.owner.mu.Lock()
w.closeErr = closeErr
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()
// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})
// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}
defer func() {
w.closeErr = closeErr
// shutdown substreams and resuming substreams
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
}
}
for _, ws := range w.resuming {
if _, ok := closing[ws]; ws != nil && !ok {
close(ws.recvc)
}
}
w.joinSubstreams()
for toClose := len(w.substreams) + len(w.resuming); toClose > 0; toClose-- {
w.closeSubstream(<-w.closingc)
}
w.owner.closeStream(w)
}()
// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
var pendingReq, failedReq *watchRequest
curReqC := w.reqc
cancelSet := make(map[int64]struct{})
for {
select {
// Watch() requested
case pendingReq = <-curReqC:
// no more watch requests until there's a response
curReqC = nil
if err := wc.Send(pendingReq.toPB()); err == nil {
// pendingReq now waits on w.respc
break
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbufffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
failedReq = pendingReq
// New events from the watch client
case pbresp := <-w.respc:
switch {
case pbresp.Created:
// response to pending req, try to add
w.addStream(pbresp, pendingReq)
pendingReq = nil
curReqC = w.reqc
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any
w.mu.Lock()
if ws, ok := w.streams[pbresp.WatchId]; ok {
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
delete(w.streams, ws.id)
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
closing[ws] = struct{}{}
}
default:
// dispatch to appropriate watch stream
@ -448,7 +460,6 @@ func (w *watchGrpcStream) run() {
wc.Send(req)
}
// watch client failed to recv; spawn another if possible
// TODO report watch client errors from errc?
case err := <-w.errc:
if toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
@ -457,48 +468,58 @@ func (w *watchGrpcStream) run() {
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
curReqC = w.reqc
if pendingReq != nil {
failedReq = pendingReq
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
case <-stopc:
case <-w.stopc:
return
}
// send failed; queue for retry
if failedReq != nil {
go func(wr *watchRequest) {
select {
case w.reqc <- wr:
case <-wr.ctx.Done():
case <-w.donec:
}
}(pendingReq)
failedReq = nil
pendingReq = nil
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
return
}
}
}
}
// nextResume chooses the next resuming to register with the grpc stream. Abandoned
// streams are marked as nil in the queue since the head must wait for its inflight registration.
func (w *watchGrpcStream) nextResume() *watcherStream {
for len(w.resuming) != 0 {
if w.resuming[0] != nil {
return w.resuming[0]
}
w.resuming = w.resuming[1:len(w.resuming)]
}
return nil
}
// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
ws, ok := w.substreams[pbresp.WatchId]
if !ok {
return false
}
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
if ok {
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Canceled: pbresp.Canceled}
ws.recvc <- wr
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
created: pbresp.Created,
Canceled: pbresp.Canceled,
}
return ok
select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
// serveWatchClient forwards messages from the grpc stream to run()
@ -520,134 +541,123 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
}
}
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
// serveSubstream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
if ws.closing {
panic("created substream goroutine but substream is closing")
}
// nextRev is the minimum expected next revision
nextRev := ws.initReq.rev
resuming := false
closing := false
for !closing {
defer func() {
if !resuming {
ws.closing = true
}
close(ws.donec)
if !resuming {
w.closingc <- ws
}
}()
emptyWr := &WatchResponse{}
for {
curWr := emptyWr
outc := ws.outc
if len(wrs) > 0 {
curWr = wrs[0]
if len(ws.buf) > 0 && ws.buf[0].created {
select {
case ws.initReq.retc <- ws.outc:
default:
}
ws.buf = ws.buf[1:]
}
if len(ws.buf) > 0 {
curWr = ws.buf[0]
} else {
outc = nil
}
select {
case outc <- *curWr:
if wrs[0].Err() != nil {
closing = true
break
}
var newRev int64
if len(wrs[0].Events) > 0 {
newRev = wrs[0].Events[len(wrs[0].Events)-1].Kv.ModRevision
} else {
newRev = wrs[0].Header.Revision
}
if newRev != ws.lastRev {
ws.lastRev = newRev
}
wrs[0] = nil
wrs = wrs[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeStream
if ws.buf[0].Err() != nil {
return
}
// resume up to last seen event if disconnected
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
if wr.Events[i].Kv.ModRevision > ws.lastRev {
wr.Events = wr.Events[i:]
break
}
}
// only forward new events
if wr.Events[0].Kv.ModRevision == ws.lastRev {
break
}
ws.buf[0] = nil
ws.buf = ws.buf[1:]
case wr, ok := <-ws.recvc:
if !ok {
// shutdown from closeSubstream
return
}
resuming = false
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:
wrs = nil
resuming = true
if resumeRev == -1 {
// pause serving stream while resume gets set up
break
// TODO pause channel if buffer gets too large
ws.buf = append(ws.buf, wr)
nextRev = wr.Header.Revision
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
if resumeRev != ws.lastRev {
panic("unexpected resume revision")
}
case <-w.donec:
closing = true
closeErr = w.closeErr
ws.initReq.rev = nextRev
case <-ws.initReq.ctx.Done():
closing = true
return
case <-resumec:
resuming = true
return
}
}
// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}
func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
return nil, rerr
// connect to grpc stream
wc, err := w.openWatchClient()
if err != nil {
return nil, v3rpc.Error(err)
}
go w.serveWatchClient(ws)
return ws, nil
}
// resume creates a new WatchClient with all current watchers reestablished
func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
for {
if ws, err = w.openWatchClient(); err != nil {
break
} else if err = w.resumeWatchers(ws); err == nil {
break
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}
// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
<-ws.donec
}
for _, ws := range w.resuming {
if ws != nil {
<-ws.donec
}
}
return ws, v3rpc.Error(err)
}
// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
w.mu.Lock()
stopc := w.stopc
w.mu.Unlock()
if stopc == nil {
select {
case <-w.stopc:
if err == nil {
err = context.Canceled
return nil, context.Canceled
}
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break
@ -659,48 +669,6 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return ws, nil
}
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams = append(streams, ws)
}
w.mu.RUnlock()
for _, ws := range streams {
// pause serveStream
ws.resumec <- -1
// reconstruct watcher from initial request
if ws.lastRev != 0 {
ws.initReq.rev = ws.lastRev
}
if err := wc.Send(ws.initReq.toPB()); err != nil {
return err
}
// wait for request ack
resp, err := wc.Recv()
if err != nil {
return err
} else if len(resp.Events) != 0 || !resp.Created {
return fmt.Errorf("watcher: unexpected response (%+v)", resp)
}
// id may be different since new remote watcher; update map
w.mu.Lock()
delete(w.streams, ws.id)
ws.id = resp.WatchId
w.streams[ws.id] = ws
w.mu.Unlock()
// unpause serveStream
ws.resumec <- ws.lastRev
}
return nil
}
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
@ -708,6 +676,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
PrevKv: wr.prevKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}

View File

@ -75,11 +75,11 @@ func authCredWriteKeyTest(cx ctlCtx) {
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
// confirm root role doesn't grant access to all keys
if err := ctlV3PutFailPerm(cx, "foo", "bar"); err != nil {
// confirm root role can access to all keys
if err := ctlV3Put(cx, "foo", "bar", ""); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3GetFailPerm(cx, "foo"); err != nil {
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil {
cx.t.Fatal(err)
}
@ -90,17 +90,17 @@ func authCredWriteKeyTest(cx ctlCtx) {
}
// confirm put failed
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "a"}}...); err != nil {
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil {
cx.t.Fatal(err)
}
// try good user
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Put(cx, "foo", "bar", ""); err != nil {
if err := ctlV3Put(cx, "foo", "bar2", ""); err != nil {
cx.t.Fatal(err)
}
// confirm put succeeded
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil {
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar2"}}...); err != nil {
cx.t.Fatal(err)
}
@ -111,7 +111,7 @@ func authCredWriteKeyTest(cx ctlCtx) {
}
// confirm put failed
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil {
if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar2"}}...); err != nil {
cx.t.Fatal(err)
}
}
@ -282,10 +282,6 @@ func ctlV3PutFailPerm(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "permission denied")
}
func ctlV3GetFailPerm(cx ctlCtx, key string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "get", key), "permission denied")
}
func authSetupTestUser(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil {
cx.t.Fatal(err)

View File

@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
}
if resp.Kvs[0].CreateRevision != 4 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
}
}

View File

@ -231,6 +231,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `
- prefix -- watch on a prefix if prefix is set.
- prev-kv -- get the previous key-value pair before the event happens.
- rev -- the revision to start watching. Specifying a revision is useful for observing past events.
#### Input Format
@ -245,7 +247,7 @@ watch [options] <key or prefix>\n
##### Simple reply
- \<event\>\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- \<event\>[\n\<old_key\>\n\<old_value\>]\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- Additional error string if WATCH failed. Exit code is non-zero.

View File

@ -23,6 +23,7 @@ import (
var (
delPrefix bool
delPrevKV bool
)
// NewDelCommand returns the cobra command for "del".
@ -34,6 +35,7 @@ func NewDelCommand() *cobra.Command {
}
cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix")
cmd.Flags().BoolVar(&delPrevKV, "prev-kv", false, "return deleted key-value pairs")
return cmd
}
@ -65,6 +67,9 @@ func getDelOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
if delPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if delPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
return key, opts
}

View File

@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
@ -42,9 +45,10 @@ import (
)
var (
migrateDatadir string
migrateWALdir string
migrateTransformer string
migrateExcludeTTLKey bool
migrateDatadir string
migrateWALdir string
migrateTransformer string
)
// NewMigrateCommand returns the cobra command for "migrate".
@ -55,6 +59,7 @@ func NewMigrateCommand() *cobra.Command {
Run: migrateCommandFunc,
}
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
@ -74,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}
st := rebuildStoreV2()
st, index := rebuildStoreV2()
be := prepareBackend()
defer be.Close()
maxIndexc := make(chan uint64, 1)
go func() {
maxIndexc <- writeStore(writer, st)
writeStore(writer, st)
writer.Close()
}()
readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
mvcc.UpdateConsistentIndex(be, index)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
@ -106,7 +110,10 @@ func prepareBackend() backend.Backend {
return be
}
func rebuildStoreV2() store.Store {
func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
@ -122,6 +129,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
}
w, err := wal.OpenForRead(waldir, walsnap)
@ -143,9 +151,15 @@ func rebuildStoreV2() store.Store {
}
}
applier := etcdserver.NewApplierV2(st, nil)
cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue
}
@ -160,9 +174,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier)
}
}
if ent.Index > index {
index = ent.Index
}
}
return st
return st, index
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
}
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
@ -216,11 +255,13 @@ func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
if n.Dir {
n.Nodes = nil
}
b, err := json.Marshal(n)
if err != nil {
ExitWithError(ExitError, err)
if !migrateExcludeTTLKey || n.TTL == 0 {
b, err := json.Marshal(n)
if err != nil {
ExitWithError(ExitError, err)
}
fmt.Fprint(w, string(b))
}
fmt.Fprintf(w, string(b))
for _, nn := range nodes {
max := writeKeys(w, nn)
if max > maxIndex {

View File

@ -108,6 +108,9 @@ type simplePrinter struct {
func (s *simplePrinter) Del(resp v3.DeleteResponse) {
fmt.Println(resp.Deleted)
for _, kv := range resp.PrevKvs {
printKV(s.isHex, kv)
}
}
func (s *simplePrinter) Get(resp v3.GetResponse) {
@ -116,7 +119,12 @@ func (s *simplePrinter) Get(resp v3.GetResponse) {
}
}
func (s *simplePrinter) Put(r v3.PutResponse) { fmt.Println("OK") }
func (s *simplePrinter) Put(r v3.PutResponse) {
fmt.Println("OK")
if r.PrevKv != nil {
printKV(s.isHex, r.PrevKv)
}
}
func (s *simplePrinter) Txn(resp v3.TxnResponse) {
if resp.Succeeded {
@ -143,6 +151,9 @@ func (s *simplePrinter) Txn(resp v3.TxnResponse) {
func (s *simplePrinter) Watch(resp v3.WatchResponse) {
for _, e := range resp.Events {
fmt.Println(e.Type)
if e.PrevKv != nil {
printKV(s.isHex, e.PrevKv)
}
printKV(s.isHex, e.Kv)
}
}

View File

@ -24,7 +24,8 @@ import (
)
var (
leaseStr string
leaseStr string
putPrevKV bool
)
// NewPutCommand returns the cobra command for "put".
@ -49,6 +50,7 @@ will store the content of the file to <key>.
Run: putCommandFunc,
}
cmd.Flags().StringVar(&leaseStr, "lease", "0", "lease ID (in hexadecimal) to attach to the key")
cmd.Flags().BoolVar(&putPrevKV, "prev-kv", false, "return changed key-value pairs")
return cmd
}
@ -85,6 +87,9 @@ func getPutOp(cmd *cobra.Command, args []string) (string, string, []clientv3.OpO
if id != 0 {
opts = append(opts, clientv3.WithLease(clientv3.LeaseID(id)))
}
if putPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
return key, value, opts
}

View File

@ -115,7 +115,7 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
partpath := path + ".part"
f, err := os.Create(partpath)
defer f.Close()
if err != nil {
exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
ExitWithError(ExitBadArgs, exiterr)
@ -134,6 +134,8 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
fileutil.Fsync(f)
f.Close()
if rerr := os.Rename(partpath, path); rerr != nil {
exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
ExitWithError(ExitIO, exiterr)

View File

@ -77,12 +77,13 @@ func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
cmp, err := parseCompare(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)
@ -99,12 +100,13 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
if err != nil {
ExitWithError(ExitInvalidInput, err)
}
if len(line) == 1 {
// remove space from the line
line = strings.TrimSpace(line)
if len(line) == 0 {
break
}
// remove trialling \n
line = line[:len(line)-1]
op, err := parseRequestUnion(line)
if err != nil {
ExitWithError(ExitInvalidInput, err)

View File

@ -29,6 +29,7 @@ var (
watchRev int64
watchPrefix bool
watchInteractive bool
watchPrevKey bool
)
// NewWatchCommand returns the cobra command for "watch".
@ -42,6 +43,7 @@ func NewWatchCommand() *cobra.Command {
cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
return cmd
}
@ -119,6 +121,9 @@ func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error)
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if watchPrevKey {
opts = append(opts, clientv3.WithPrevKV())
}
return c.Watch(context.TODO(), key, opts...), nil
}

View File

@ -466,19 +466,24 @@ func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() ==
func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
// checkBindURLs returns an error if any URL uses a domain name.
// TODO: return error in 3.2.0
func checkBindURLs(urls []url.URL) error {
for _, url := range urls {
if url.Scheme == "unix" || url.Scheme == "unixs" {
continue
}
host := strings.Split(url.Host, ":")[0]
host, _, err := net.SplitHostPort(url.Host)
if err != nil {
return err
}
if host == "localhost" {
// special case for local address
// TODO: support /etc/hosts ?
continue
}
if net.ParseIP(host) == nil {
return fmt.Errorf("expected IP in URL for binding (%s)", url.String())
err := fmt.Errorf("expected IP in URL for binding (%s)", url.String())
plog.Warning(err)
}
}
return nil

View File

@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
@ -82,6 +82,8 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.WatchableKV
gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse
@ -91,6 +93,7 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send
// progress to.
progress map[mvcc.WatchID]bool
prevKV map[mvcc.WatchID]bool
// closec indicates the stream is closed.
closec chan struct{}
@ -101,14 +104,18 @@ type serverWatchStream struct {
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
watchable: ws.watchable,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
@ -170,9 +177,14 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
if id != -1 && creq.ProgressNotify {
if id != -1 {
sws.mu.Lock()
sws.progress[id] = true
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
sws.mu.Unlock()
}
wr := &pb.WatchResponse{
@ -198,6 +210,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
@ -244,8 +257,19 @@ func (sws *serverWatchStream) sendLoop() {
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.Lock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.Unlock()
for i := range evs {
events[i] = &evs[i]
if needPrevKV {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
wr := &pb.WatchResponse{

View File

@ -159,6 +159,22 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev int64
err error
)
var rr *mvcc.RangeResult
if p.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
@ -174,6 +190,9 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
}
resp.Header.Revision = rev
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, nil
}
@ -191,6 +210,21 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
dr.RangeEnd = []byte{}
}
var rr *mvcc.RangeResult
if dr.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn {
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil {
@ -201,6 +235,11 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
}
resp.Deleted = n
if rr != nil {
for i := range rr.KVs {
resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
}
}
resp.Header.Revision = rev
return resp, nil
}

View File

@ -56,6 +56,9 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
if !aa.as.IsPutPermitted(aa.user, r.Key) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, nil) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.Put(txnID, r)
}
@ -70,6 +73,9 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.DeleteRange(txnID, r)
}
@ -99,7 +105,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) bool {
continue
}
if !aa.as.IsDeleteRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
if tv.RequestDeleteRange.PrevKv && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
return false
}
}

View File

@ -102,9 +102,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -10,9 +10,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

File diff suppressed because it is too large Load Diff

View File

@ -396,10 +396,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease.
int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
}
message PutResponse {
ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
}
message DeleteRangeRequest {
@ -409,12 +415,17 @@ message DeleteRangeRequest {
// If range_end is not given, the range is defined to contain only the key argument.
// If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
bool prev_kv = 3;
}
message DeleteRangeResponse {
ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request.
int64 deleted = 2;
// if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue prev_kvs = 3;
}
message RequestOp {
@ -563,6 +574,9 @@ message WatchCreateRequest {
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
}
message WatchCancelRequest {

View File

@ -154,13 +154,13 @@ type Server interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
// r and inflightSnapshots must be the first elements to keep 64-bit alignment for atomic
// access to fields
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
Cfg *ServerConfig
// inflightSnapshots holds count the number of snapshots currently inflight.
inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
// consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
Cfg *ServerConfig
readych chan struct{}
r raftNode
@ -195,10 +195,6 @@ type EtcdServer struct {
// compactor is used to auto-compact the KV.
compactor *compactor.Periodic
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex
// peerRt used to send requests (version, lease) to peers.
peerRt http.RoundTripper
reqIDGen *idutil.Generator
@ -212,8 +208,6 @@ type EtcdServer struct {
// wg is used to wait for the go routines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
appliedIndex uint64
}
// NewServer creates a new EtcdServer from the supplied configuration. The

View File

@ -551,4 +551,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
}
// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() }
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

View File

@ -188,11 +188,8 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
t.Fatal(err)
}
s, serr := concurrency.NewSession(cli)
if serr != nil {
t.Fatal(serr)
}
e := concurrency.NewElection(s, "test")
e := concurrency.NewElection(cli, "test")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := e.Campaign(ctx, "abc")
cancel()

View File

@ -379,6 +379,7 @@ func TestV3DeleteRange(t *testing.T) {
keySet []string
begin string
end string
prevKV bool
wantSet [][]byte
deleted int64
@ -386,39 +387,45 @@ func TestV3DeleteRange(t *testing.T) {
// delete middle
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fop",
"foo/", "fop", false,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
// no delete
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "foo/",
"foo/", "foo/", false,
[][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
},
// delete first
{
[]string{"foo", "foo/abc", "fop"},
"fo", "fop",
"fo", "fop", false,
[][]byte{[]byte("fop")}, 2,
},
// delete tail
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fos",
"foo/", "fos", false,
[][]byte{[]byte("foo")}, 2,
},
// delete exact
{
[]string{"foo", "foo/abc", "fop"},
"foo/abc", "",
"foo/abc", "", false,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
// delete none, [x,x)
{
[]string{"foo"},
"foo", "foo",
"foo", "foo", false,
[][]byte{[]byte("foo")}, 0,
},
// delete middle with preserveKVs set
{
[]string{"foo", "foo/abc", "fop"},
"foo/", "fop", true,
[][]byte{[]byte("foo"), []byte("fop")}, 1,
},
}
for i, tt := range tests {
@ -436,7 +443,9 @@ func TestV3DeleteRange(t *testing.T) {
dreq := &pb.DeleteRangeRequest{
Key: []byte(tt.begin),
RangeEnd: []byte(tt.end)}
RangeEnd: []byte(tt.end),
PrevKv: tt.prevKV,
}
dresp, err := kvc.DeleteRange(context.TODO(), dreq)
if err != nil {
t.Fatalf("couldn't delete range on test %d (%v)", i, err)
@ -444,6 +453,11 @@ func TestV3DeleteRange(t *testing.T) {
if tt.deleted != dresp.Deleted {
t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
}
if tt.prevKV {
if len(dresp.PrevKvs) != int(dresp.Deleted) {
t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
}
}
rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
rresp, err := kvc.Range(context.TODO(), rreq)

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -408,6 +408,13 @@ func (s *store) restore() error {
s.currentRev = rev
}
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev.main < s.compactMainRev {
s.currentRev.main = s.compactMainRev
}
for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")

View File

@ -15,8 +15,10 @@
package mvcc
import (
"os"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
@ -93,3 +95,41 @@ func TestScheduleCompaction(t *testing.T) {
cleanup(s, b, tmpPath)
}
}
func TestCompactAllAndRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s0 := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
s0.DeleteRange([]byte("foo"), nil)
rev := s0.Rev()
// compact all keys
done, err := s0.Compact(rev)
if err != nil {
t.Fatal(err)
}
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for compaction to finish")
}
err = s0.Close()
if err != nil {
t.Fatal(err)
}
s1 := NewStore(b, &lease.FakeLessor{}, nil)
if s1.Rev() != rev {
t.Errorf("rev = %v, want %v", s1.Rev(), rev)
}
_, err = s1.Range([]byte("foo"), nil, RangeOptions{})
if err != nil {
t.Errorf("unexpect range error %v", err)
}
}

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -89,6 +89,8 @@ type Event struct {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
func (m *Event) Reset() { *m = Event{} }
@ -181,6 +183,16 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
}
i += n1
}
if m.PrevKv != nil {
data[i] = 0x1a
i++
i = encodeVarintKv(data, i, uint64(m.PrevKv.Size()))
n2, err := m.PrevKv.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n2
}
return i, nil
}
@ -247,6 +259,10 @@ func (m *Event) Size() (n int) {
l = m.Kv.Size()
n += 1 + l + sovKv(uint64(l))
}
if m.PrevKv != nil {
l = m.PrevKv.Size()
n += 1 + l + sovKv(uint64(l))
}
return n
}
@ -532,6 +548,39 @@ func (m *Event) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PrevKv", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowKv
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthKv
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.PrevKv == nil {
m.PrevKv = &KeyValue{}
}
if err := m.PrevKv.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipKv(data[iNdEx:])
@ -659,23 +708,24 @@ var (
)
var fileDescriptorKv = []byte{
// 285 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x90, 0x41, 0x4a, 0xc3, 0x40,
0x14, 0x86, 0x33, 0x4d, 0x9b, 0xd6, 0xd7, 0x52, 0xc3, 0x50, 0x70, 0x70, 0x31, 0xc4, 0x6e, 0x2c,
0x08, 0x11, 0xea, 0x0d, 0xc4, 0xac, 0x74, 0x21, 0x21, 0xba, 0x95, 0x34, 0x7d, 0x94, 0x92, 0xa6,
0x13, 0xd2, 0x38, 0x98, 0x9b, 0x78, 0x0a, 0xcf, 0xd1, 0x65, 0x8f, 0x60, 0xe3, 0x45, 0x24, 0x6f,
0x4c, 0xdd, 0x0c, 0xef, 0xff, 0xff, 0x6f, 0x98, 0xff, 0x0d, 0x0c, 0x52, 0xed, 0xe7, 0x85, 0x2a,
0x15, 0x77, 0x32, 0x9d, 0x24, 0xf9, 0xe2, 0x72, 0xb2, 0x52, 0x2b, 0x45, 0xd6, 0x6d, 0x33, 0x99,
0x74, 0xfa, 0xc5, 0x60, 0xf0, 0x88, 0xd5, 0x6b, 0xbc, 0x79, 0x47, 0xee, 0x82, 0x9d, 0x62, 0x25,
0x98, 0xc7, 0x66, 0xa3, 0xb0, 0x19, 0xf9, 0x35, 0x9c, 0x27, 0x05, 0xc6, 0x25, 0xbe, 0x15, 0xa8,
0xd7, 0xbb, 0xb5, 0xda, 0x8a, 0x8e, 0xc7, 0x66, 0x76, 0x38, 0x36, 0x76, 0xf8, 0xe7, 0xf2, 0x2b,
0x18, 0x65, 0x6a, 0xf9, 0x4f, 0xd9, 0x44, 0x0d, 0x33, 0xb5, 0x3c, 0x21, 0x02, 0xfa, 0x1a, 0x0b,
0x4a, 0xbb, 0x94, 0xb6, 0x92, 0x4f, 0xa0, 0xa7, 0x9b, 0x02, 0xa2, 0x47, 0x2f, 0x1b, 0xd1, 0xb8,
0x1b, 0x8c, 0x77, 0x28, 0x1c, 0xa2, 0x8d, 0x98, 0x7e, 0x40, 0x2f, 0xd0, 0xb8, 0x2d, 0xf9, 0x0d,
0x74, 0xcb, 0x2a, 0x47, 0x6a, 0x3b, 0x9e, 0x5f, 0xf8, 0x66, 0x4d, 0x9f, 0x42, 0x73, 0x46, 0x55,
0x8e, 0x21, 0x41, 0xdc, 0x83, 0x4e, 0xaa, 0xa9, 0xfa, 0x70, 0xee, 0xb6, 0x68, 0xbb, 0x77, 0xd8,
0x49, 0xf5, 0xd4, 0x83, 0xb3, 0xd3, 0x25, 0xde, 0x07, 0xfb, 0xf9, 0x25, 0x72, 0x2d, 0x0e, 0xe0,
0x3c, 0x04, 0x4f, 0x41, 0x14, 0xb8, 0xec, 0x5e, 0xec, 0x8f, 0xd2, 0x3a, 0x1c, 0xa5, 0xb5, 0xaf,
0x25, 0x3b, 0xd4, 0x92, 0x7d, 0xd7, 0x92, 0x7d, 0xfe, 0x48, 0x6b, 0xe1, 0xd0, 0x5f, 0xde, 0xfd,
0x06, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x21, 0x8f, 0x2c, 0x75, 0x01, 0x00, 0x00,
// 303 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x14, 0x86, 0x3b, 0x14, 0x0a, 0x3e, 0x08, 0x36, 0x13, 0x12, 0x27, 0x2e, 0x26, 0x95, 0x8d, 0x18,
0x13, 0x4c, 0xf0, 0x06, 0xc6, 0xae, 0x70, 0x61, 0x1a, 0x74, 0x4b, 0x4a, 0x79, 0x21, 0xa4, 0x94,
0x69, 0x4a, 0x9d, 0xa4, 0x37, 0x71, 0xef, 0xde, 0x73, 0xb0, 0xe4, 0x08, 0x52, 0x2f, 0x62, 0xfa,
0xc6, 0xe2, 0xc6, 0xcd, 0xe4, 0xfd, 0xff, 0xff, 0x65, 0xe6, 0x7f, 0x03, 0x9d, 0x58, 0x8f, 0xd3,
0x4c, 0xe5, 0x8a, 0x3b, 0x89, 0x8e, 0xa2, 0x74, 0x71, 0x39, 0x58, 0xa9, 0x95, 0x22, 0xeb, 0xae,
0x9a, 0x4c, 0x3a, 0xfc, 0x64, 0xd0, 0x99, 0x62, 0xf1, 0x1a, 0x6e, 0xde, 0x90, 0xbb, 0x60, 0xc7,
0x58, 0x08, 0xe6, 0xb1, 0x51, 0x2f, 0xa8, 0x46, 0x7e, 0x0d, 0xe7, 0x51, 0x86, 0x61, 0x8e, 0xf3,
0x0c, 0xf5, 0x7a, 0xb7, 0x56, 0x5b, 0xd1, 0xf0, 0xd8, 0xc8, 0x0e, 0xfa, 0xc6, 0x0e, 0x7e, 0x5d,
0x7e, 0x05, 0xbd, 0x44, 0x2d, 0xff, 0x28, 0x9b, 0xa8, 0x6e, 0xa2, 0x96, 0x27, 0x44, 0x40, 0x5b,
0x63, 0x46, 0x69, 0x93, 0xd2, 0x5a, 0xf2, 0x01, 0xb4, 0x74, 0x55, 0x40, 0xb4, 0xe8, 0x65, 0x23,
0x2a, 0x77, 0x83, 0xe1, 0x0e, 0x85, 0x43, 0xb4, 0x11, 0xc3, 0x0f, 0x06, 0x2d, 0x5f, 0xe3, 0x36,
0xe7, 0xb7, 0xd0, 0xcc, 0x8b, 0x14, 0xa9, 0x6e, 0x7f, 0x72, 0x31, 0x36, 0x7b, 0x8e, 0x29, 0x34,
0xe7, 0xac, 0x48, 0x31, 0x20, 0x88, 0x7b, 0xd0, 0x88, 0x35, 0x75, 0xef, 0x4e, 0xdc, 0x1a, 0xad,
0x17, 0x0f, 0x1a, 0xb1, 0xe6, 0x37, 0xd0, 0x4e, 0x33, 0xd4, 0xf3, 0x58, 0x53, 0xf9, 0xff, 0x30,
0xa7, 0x02, 0xa6, 0x7a, 0xe8, 0xc1, 0xd9, 0xe9, 0x7e, 0xde, 0x06, 0xfb, 0xf9, 0x65, 0xe6, 0x5a,
0x1c, 0xc0, 0x79, 0xf4, 0x9f, 0xfc, 0x99, 0xef, 0xb2, 0x07, 0xb1, 0x3f, 0x4a, 0xeb, 0x70, 0x94,
0xd6, 0xbe, 0x94, 0xec, 0x50, 0x4a, 0xf6, 0x55, 0x4a, 0xf6, 0xfe, 0x2d, 0xad, 0x85, 0x43, 0xff,
0x7e, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x45, 0x92, 0x5d, 0xa1, 0x01, 0x00, 0x00,
}

View File

@ -43,4 +43,6 @@ message Event {
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;
// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}

View File

@ -38,9 +38,12 @@ type PageWriter struct {
bufWatermarkBytes int
}
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
// NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
// to write per page. pageOffset is the starting offset of io.Writer.
func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
return &PageWriter{
w: w,
pageOffset: pageOffset,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,

View File

@ -25,7 +25,7 @@ func TestPageWriterRandom(t *testing.T) {
pageBytes := 128
buf := make([]byte, 4*defaultBufferBytes)
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
n := 0
for i := 0; i < 4096; i++ {
c, err := w.Write(buf[:rand.Intn(len(buf))])
@ -51,7 +51,7 @@ func TestPageWriterPartialSlack(t *testing.T) {
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
// put writer in non-zero page offset
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
@ -82,6 +82,35 @@ func TestPageWriterPartialSlack(t *testing.T) {
}
}
// TestPageWriterOffset tests if page writer correctly repositions when offset is given.
func TestPageWriterOffset(t *testing.T) {
defaultBufferBytes = 1024
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes, 0)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 64 {
t.Fatalf("w.pageOffset expected 64, got %d", w.pageOffset)
}
w = NewPageWriter(cw, w.pageOffset, pageBytes)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 0 {
t.Fatalf("w.pageOffset expected 0, got %d", w.pageOffset)
}
}
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
type checkPageWriter struct {
pageBytes int

View File

@ -38,7 +38,7 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead uint64
Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
RaftState StateType
}

View File

@ -25,9 +25,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -183,9 +183,9 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
type Entry struct {
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Term uint64 `protobuf:"varint,2,opt,name=Term,json=term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index,json=index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data,json=data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}

View File

@ -15,9 +15,9 @@ enum EntryType {
}
message Entry {
optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional EntryType Type = 1 [(gogoproto.nullable) = false];
optional uint64 Term = 2 [(gogoproto.nullable) = false];
optional uint64 Index = 3 [(gogoproto.nullable) = false];
optional bytes Data = 4;
}

View File

@ -49,6 +49,7 @@ var (
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)

View File

@ -19,9 +19,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.8"
Version = "3.0.14"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"

View File

@ -18,6 +18,7 @@ import (
"encoding/binary"
"hash"
"io"
"os"
"sync"
"github.com/coreos/etcd/pkg/crc"
@ -39,9 +40,9 @@ type encoder struct {
uint64buf []byte
}
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
return &encoder{
bw: ioutil.NewPageWriter(w, walPageBytes),
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
}
}
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
return newEncoder(f, prevCrc, int(offset)), nil
}
func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) {
typ := int64(0xABCD)
d := []byte("Hello world!")
buf := new(bytes.Buffer)
e := newEncoder(buf, 0)
e := newEncoder(buf, 0, 0)
e.encode(&walpb.Record{Type: typ, Data: d})
e.flush()
decoder := newDecoder(ioutil.NopCloser(buf))

View File

@ -120,7 +120,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
w := &WAL{
dir: dirpath,
metadata: metadata,
encoder: newEncoder(f, 0),
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
@ -341,7 +344,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
if err != nil {
return
}
}
w.decoder = nil
@ -375,7 +381,10 @@ func (w *WAL) cut() error {
// update writer and save the previous crc
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil {
return err
}
@ -414,7 +423,10 @@ func (w *WAL) cut() error {
w.locks[len(w.locks)-1] = newTail
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil

View File

@ -61,7 +61,7 @@ func TestNew(t *testing.T) {
}
var wb bytes.Buffer
e := newEncoder(&wb, 0)
e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
if err != nil {
t.Fatalf("err = %v, want nil", err)
@ -465,7 +465,7 @@ func TestSaveEmpty(t *testing.T) {
var buf bytes.Buffer
var est raftpb.HardState
w := WAL{
encoder: newEncoder(&buf, 0),
encoder: newEncoder(&buf, 0, 0),
}
if err := w.saveState(&est); err != nil {
t.Errorf("err = %v, want nil", err)

View File

@ -20,9 +20,9 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
)
import io "io"
io "io"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal