Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
faeeb2fc75 | |||
d50c487132 | |||
b837feffe4 | |||
4d89640195 | |||
1292d453c3 | |||
ec20b381ed | |||
37cc3f5262 | |||
7f1940e5ed | |||
caccf8e5e6 | |||
ef65dfe2eb | |||
ff6c6916f2 | |||
3dfe8765d3 | |||
a4a52cb15d | |||
014970930a | |||
4628be982c | |||
ff55e5a188 | |||
bf0898266c | |||
b9d69f7698 |
@ -4,7 +4,6 @@ go_import_path: github.com/coreos/etcd
|
|||||||
sudo: false
|
sudo: false
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.5
|
|
||||||
- 1.6
|
- 1.6
|
||||||
- tip
|
- tip
|
||||||
|
|
||||||
@ -22,10 +21,6 @@ matrix:
|
|||||||
allow_failures:
|
allow_failures:
|
||||||
- go: tip
|
- go: tip
|
||||||
exclude:
|
exclude:
|
||||||
- go: 1.5
|
|
||||||
env: TARGET=arm
|
|
||||||
- go: 1.5
|
|
||||||
env: TARGET=ppc64le
|
|
||||||
- go: 1.6
|
- go: 1.6
|
||||||
env: TARGET=arm64
|
env: TARGET=arm64
|
||||||
- go: tip
|
- go: tip
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
FROM alpine:latest
|
FROM alpine:latest
|
||||||
|
|
||||||
ADD bin/etcd /usr/local/bin/
|
ADD etcd /usr/local/bin/
|
||||||
ADD bin/etcdctl /usr/local/bin/
|
ADD etcdctl /usr/local/bin/
|
||||||
RUN mkdir -p /var/etcd/
|
RUN mkdir -p /var/etcd/
|
||||||
|
|
||||||
EXPOSE 2379 2380
|
EXPOSE 2379 2380
|
||||||
|
|
||||||
# Define default command.
|
# Define default entrypoint.
|
||||||
CMD ["/usr/local/bin/etcd"]
|
ENTRYPOINT ["/usr/local/bin/etcd"]
|
||||||
|
@ -25,7 +25,7 @@ curl -L http://localhost:2379/v3alpha/kv/range \
|
|||||||
|
|
||||||
## Swagger
|
## Swagger
|
||||||
|
|
||||||
Generated [Swapper][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
|
Generated [Swagger][swagger] API definitions can be found at [rpc.swagger.json][swagger-doc].
|
||||||
|
|
||||||
[api-ref]: ./api_reference_v3.md
|
[api-ref]: ./api_reference_v3.md
|
||||||
[go-client]: https://github.com/coreos/etcd/tree/master/clientv3
|
[go-client]: https://github.com/coreos/etcd/tree/master/clientv3
|
||||||
|
@ -11,7 +11,7 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
|
|||||||
## Build the latest version
|
## Build the latest version
|
||||||
|
|
||||||
For those wanting to try the very latest version, build etcd from the `master` branch.
|
For those wanting to try the very latest version, build etcd from the `master` branch.
|
||||||
[Go](https://golang.org/) version 1.5+ is required to build the latest version of etcd.
|
[Go](https://golang.org/) version 1.6+ (with HTTP2 support) is required to build the latest version of etcd.
|
||||||
|
|
||||||
Here are the commands to build an etcd binary from the `master` branch:
|
Here are the commands to build an etcd binary from the `master` branch:
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ See [etcdctl][etcdctl] for a simple command line client.
|
|||||||
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
|
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
|
||||||
|
|
||||||
For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch.
|
For those wanting to try the very latest version, you can build the latest version of etcd from the `master` branch.
|
||||||
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.5+ is required).
|
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
|
||||||
All development occurs on `master`, including new features and bug fixes.
|
All development occurs on `master`, including new features and bug fixes.
|
||||||
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
|
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
|
||||||
|
|
||||||
|
@ -37,6 +37,10 @@ var (
|
|||||||
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||||
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
|
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
|
||||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||||
|
|
||||||
|
// oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
|
||||||
|
// that Do() will not retry a request
|
||||||
|
oneShotCtxValue interface{}
|
||||||
)
|
)
|
||||||
|
|
||||||
var DefaultRequestTimeout = 5 * time.Second
|
var DefaultRequestTimeout = 5 * time.Second
|
||||||
@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
|||||||
var body []byte
|
var body []byte
|
||||||
var err error
|
var err error
|
||||||
cerr := &ClusterError{}
|
cerr := &ClusterError{}
|
||||||
|
isOneShot := ctx.Value(&oneShotCtxValue) != nil
|
||||||
|
|
||||||
for i := pinned; i < leps+pinned; i++ {
|
for i := pinned; i < leps+pinned; i++ {
|
||||||
k := i % leps
|
k := i % leps
|
||||||
@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
|||||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
if isOneShot {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.StatusCode/100 == 5 {
|
if resp.StatusCode/100 == 5 {
|
||||||
@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
|||||||
default:
|
default:
|
||||||
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
|
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
|
||||||
}
|
}
|
||||||
|
if isOneShot {
|
||||||
|
return nil, nil, cerr.Errors[0]
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if k != pinned {
|
if k != pinned {
|
||||||
|
134
client/integration/client_test.go
Normal file
134
client/integration/client_test.go
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
// Copyright 2016 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/client"
|
||||||
|
"github.com/coreos/etcd/integration"
|
||||||
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection.
|
||||||
|
func TestV2NoRetryEOF(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
// generate an EOF response; specify address so appears first in sorted ep list
|
||||||
|
lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid()))
|
||||||
|
defer lEOF.Close()
|
||||||
|
tries := uint32(0)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
conn, err := lEOF.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
atomic.AddUint32(&tries, 1)
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
eofURL := integration.UrlScheme + "://" + lEOF.Addr().String()
|
||||||
|
cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil)
|
||||||
|
kapi := client.NewKeysAPI(cli)
|
||||||
|
for i, f := range noRetryList(kapi) {
|
||||||
|
startTries := atomic.LoadUint32(&tries)
|
||||||
|
if err := f(); err == nil {
|
||||||
|
t.Errorf("#%d: expected EOF error, got nil", i)
|
||||||
|
}
|
||||||
|
endTries := atomic.LoadUint32(&tries)
|
||||||
|
if startTries+1 != endTries {
|
||||||
|
t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code.
|
||||||
|
func TestV2NoRetryNoLeader(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
|
||||||
|
lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid()))
|
||||||
|
eh := &errHandler{errCode: http.StatusServiceUnavailable}
|
||||||
|
srv := httptest.NewUnstartedServer(eh)
|
||||||
|
defer lHttp.Close()
|
||||||
|
defer srv.Close()
|
||||||
|
srv.Listener = lHttp
|
||||||
|
go srv.Start()
|
||||||
|
lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String()
|
||||||
|
|
||||||
|
cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil)
|
||||||
|
kapi := client.NewKeysAPI(cli)
|
||||||
|
// test error code
|
||||||
|
for i, f := range noRetryList(kapi) {
|
||||||
|
reqs := eh.reqs
|
||||||
|
if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") {
|
||||||
|
t.Errorf("#%d: expected \"no leader\", got %v", i, err)
|
||||||
|
}
|
||||||
|
if eh.reqs != reqs+1 {
|
||||||
|
t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
|
||||||
|
func TestV2RetryRefuse(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
cl := integration.NewCluster(t, 1)
|
||||||
|
cl.Launch(t)
|
||||||
|
defer cl.Terminate(t)
|
||||||
|
// test connection refused; expect no error failover
|
||||||
|
cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil)
|
||||||
|
kapi := client.NewKeysAPI(cli)
|
||||||
|
if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for i, f := range noRetryList(kapi) {
|
||||||
|
if err := f(); err != nil {
|
||||||
|
t.Errorf("#%d: unexpected retry failure (%v)", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errHandler struct {
|
||||||
|
errCode int
|
||||||
|
reqs int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
|
req.Body.Close()
|
||||||
|
eh.reqs++
|
||||||
|
w.WriteHeader(eh.errCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func noRetryList(kapi client.KeysAPI) []func() error {
|
||||||
|
return []func() error{
|
||||||
|
func() error {
|
||||||
|
opts := &client.SetOptions{PrevExist: client.PrevNoExist}
|
||||||
|
_, err := kapi.Set(context.Background(), "/setkey", "bar", opts)
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
func() error {
|
||||||
|
_, err := kapi.Delete(context.Background(), "/delkey", nil)
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
20
client/integration/main_test.go
Normal file
20
client/integration/main_test.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
// Copyright 2013 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
v := m.Run()
|
||||||
|
if v == 0 && testutil.CheckLeakedGoroutine() {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
os.Exit(v)
|
||||||
|
}
|
@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
|
|||||||
act.Dir = opts.Dir
|
act.Dir = opts.Dir
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, body, err := k.client.Do(ctx, act)
|
doCtx := ctx
|
||||||
|
if act.PrevExist == PrevNoExist {
|
||||||
|
doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
|
||||||
|
}
|
||||||
|
resp, body, err := k.client.Do(doCtx, act)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption
|
|||||||
act.Recursive = opts.Recursive
|
act.Recursive = opts.Recursive
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, body, err := k.client.Do(ctx, act)
|
doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
|
||||||
|
resp, body, err := k.client.Do(doCtx, act)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,9 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
|
|||||||
t.Fatal("failed to receive update in one second")
|
t.Fatal("failed to receive update in one second")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMirrorSyncBase(t *testing.T) {
|
||||||
|
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer cluster.Terminate(nil)
|
||||||
|
|
||||||
|
cli := cluster.Client(0)
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
keyCh := make(chan string)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for key := range keyCh {
|
||||||
|
if _, err := cli.Put(ctx, key, "test"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2000; i++ {
|
||||||
|
keyCh <- fmt.Sprintf("test%d", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(keyCh)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
syncer := mirror.NewSyncer(cli, "test", 0)
|
||||||
|
respCh, errCh := syncer.SyncBase(ctx)
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
for resp := range respCh {
|
||||||
|
count = count + len(resp.Kvs)
|
||||||
|
if !resp.More {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for err := range errCh {
|
||||||
|
t.Fatalf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count != 2000 {
|
||||||
|
t.Errorf("unexpected kv count: %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
|||||||
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
|
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
|
||||||
// We then range from the prefix to the next prefix if exists. Or we will
|
// We then range from the prefix to the next prefix if exists. Or we will
|
||||||
// range from the prefix to the end if the next prefix does not exists.
|
// range from the prefix to the end if the next prefix does not exists.
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
|
||||||
key = s.prefix
|
key = s.prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,6 +182,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPrefixRangeEnd gets the range end of the prefix.
|
||||||
|
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
|
||||||
|
func GetPrefixRangeEnd(prefix string) string {
|
||||||
|
return string(getPrefix([]byte(prefix)))
|
||||||
|
}
|
||||||
|
|
||||||
func getPrefix(key []byte) []byte {
|
func getPrefix(key []byte) []byte {
|
||||||
end := make([]byte, len(key))
|
end := make([]byte, len(key))
|
||||||
copy(end, key)
|
copy(end, key)
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
|
"github.com/coreos/etcd/version"
|
||||||
"github.com/coreos/go-semver/semver"
|
"github.com/coreos/go-semver/semver"
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
)
|
)
|
||||||
@ -71,7 +72,7 @@ func runCapabilityLoop(s *etcdserver.EtcdServer) {
|
|||||||
enableMapMu.Lock()
|
enableMapMu.Lock()
|
||||||
enabledMap = capabilityMaps[pv.String()]
|
enabledMap = capabilityMaps[pv.String()]
|
||||||
enableMapMu.Unlock()
|
enableMapMu.Unlock()
|
||||||
plog.Infof("enabled capabilities for version %s", pv)
|
plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,11 +86,11 @@ type serverWatchStream struct {
|
|||||||
watchStream mvcc.WatchStream
|
watchStream mvcc.WatchStream
|
||||||
ctrlStream chan *pb.WatchResponse
|
ctrlStream chan *pb.WatchResponse
|
||||||
|
|
||||||
|
// mu protects progress, prevKV
|
||||||
|
mu sync.Mutex
|
||||||
// progress tracks the watchID that stream might need to send
|
// progress tracks the watchID that stream might need to send
|
||||||
// progress to.
|
// progress to.
|
||||||
progress map[mvcc.WatchID]bool
|
progress map[mvcc.WatchID]bool
|
||||||
// mu protects progress
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
// closec indicates the stream is closed.
|
// closec indicates the stream is closed.
|
||||||
closec chan struct{}
|
closec chan struct{}
|
||||||
@ -171,7 +171,9 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
}
|
}
|
||||||
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||||
if id != -1 && creq.ProgressNotify {
|
if id != -1 && creq.ProgressNotify {
|
||||||
|
sws.mu.Lock()
|
||||||
sws.progress[id] = true
|
sws.progress[id] = true
|
||||||
|
sws.mu.Unlock()
|
||||||
}
|
}
|
||||||
wr := &pb.WatchResponse{
|
wr := &pb.WatchResponse{
|
||||||
Header: sws.newResponseHeader(wsrev),
|
Header: sws.newResponseHeader(wsrev),
|
||||||
@ -199,9 +201,11 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||||||
sws.mu.Unlock()
|
sws.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: do we need to return error back to client?
|
|
||||||
default:
|
default:
|
||||||
panic("not implemented")
|
// we probably should not shutdown the entire stream when
|
||||||
|
// receive an valid command.
|
||||||
|
// so just do nothing instead.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -296,12 +300,14 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
delete(pending, wid)
|
delete(pending, wid)
|
||||||
}
|
}
|
||||||
case <-progressTicker.C:
|
case <-progressTicker.C:
|
||||||
|
sws.mu.Lock()
|
||||||
for id, ok := range sws.progress {
|
for id, ok := range sws.progress {
|
||||||
if ok {
|
if ok {
|
||||||
sws.watchStream.RequestProgress(id)
|
sws.watchStream.RequestProgress(id)
|
||||||
}
|
}
|
||||||
sws.progress[id] = true
|
sws.progress[id] = true
|
||||||
}
|
}
|
||||||
|
sws.mu.Unlock()
|
||||||
case <-sws.closec:
|
case <-sws.closec:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -357,7 +357,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
cl.SetStore(st)
|
cl.SetStore(st)
|
||||||
cl.SetBackend(be)
|
cl.SetBackend(be)
|
||||||
cl.Recover()
|
cl.Recover()
|
||||||
if cl.Version() != nil && cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||||
os.RemoveAll(bepath)
|
os.RemoveAll(bepath)
|
||||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||||
}
|
}
|
||||||
@ -1170,8 +1170,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
}
|
}
|
||||||
plog.Panicf("unexpected create snapshot error %v", err)
|
plog.Panicf("unexpected create snapshot error %v", err)
|
||||||
}
|
}
|
||||||
// commit v3 storage because WAL file before snapshot index
|
// commit kv to write metadata (for example: consistent index) to disk.
|
||||||
// could be removed after SaveSnap.
|
|
||||||
s.KV().Commit()
|
s.KV().Commit()
|
||||||
// SaveSnap saves the snapshot and releases the locked wal files
|
// SaveSnap saves the snapshot and releases the locked wal files
|
||||||
// to the snapshot index.
|
// to the snapshot index.
|
||||||
|
@ -39,6 +39,8 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
|
|||||||
plog.Panicf("store save should never fail: %v", err)
|
plog.Panicf("store save should never fail: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// commit kv to write metadata(for example: consistent index).
|
||||||
|
s.KV().Commit()
|
||||||
dbsnap := s.be.Snapshot()
|
dbsnap := s.be.Snapshot()
|
||||||
// get a snapshot of v3 KV as readCloser
|
// get a snapshot of v3 KV as readCloser
|
||||||
rc := newSnapshotReaderCloser(dbsnap)
|
rc := newSnapshotReaderCloser(dbsnap)
|
||||||
|
@ -54,8 +54,8 @@ const (
|
|||||||
requestTimeout = 20 * time.Second
|
requestTimeout = 20 * time.Second
|
||||||
|
|
||||||
basePort = 21000
|
basePort = 21000
|
||||||
urlScheme = "unix"
|
UrlScheme = "unix"
|
||||||
urlSchemeTLS = "unixs"
|
UrlSchemeTLS = "unixs"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -96,9 +96,9 @@ func init() {
|
|||||||
|
|
||||||
func schemeFromTLSInfo(tls *transport.TLSInfo) string {
|
func schemeFromTLSInfo(tls *transport.TLSInfo) string {
|
||||||
if tls == nil {
|
if tls == nil {
|
||||||
return urlScheme
|
return UrlScheme
|
||||||
}
|
}
|
||||||
return urlSchemeTLS
|
return UrlSchemeTLS
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) fillClusterForMembers() error {
|
func (c *cluster) fillClusterForMembers() error {
|
||||||
@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
|
func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
|
||||||
cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := ma.Add(ctx, peerURL); err != nil {
|
if _, err := ma.Add(ctx, peerURL); err != nil {
|
||||||
@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) {
|
|||||||
|
|
||||||
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
func (c *cluster) RemoveMember(t *testing.T, id uint64) {
|
||||||
// send remove request to the cluster
|
// send remove request to the cluster
|
||||||
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
|
||||||
@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) {
|
|||||||
|
|
||||||
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
|
||||||
for _, u := range c.URLs() {
|
for _, u := range c.URLs() {
|
||||||
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
for {
|
for {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
|
|||||||
func newLocalListener(t *testing.T) net.Listener {
|
func newLocalListener(t *testing.T) net.Listener {
|
||||||
c := atomic.AddInt64(&localListenCount, 1)
|
c := atomic.AddInt64(&localListenCount, 1)
|
||||||
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
|
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
|
||||||
return newListenerWithAddr(t, addr)
|
return NewListenerWithAddr(t, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListenerWithAddr(t *testing.T, addr string) net.Listener {
|
func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
|
||||||
l, err := transport.NewUnixListener(addr)
|
l, err := transport.NewUnixListener(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -614,7 +614,7 @@ func (m *member) Launch() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *member) WaitOK(t *testing.T) {
|
func (m *member) WaitOK(t *testing.T) {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
for {
|
for {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error {
|
|||||||
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
|
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
|
||||||
newPeerListeners := make([]net.Listener, 0)
|
newPeerListeners := make([]net.Listener, 0)
|
||||||
for _, ln := range m.PeerListeners {
|
for _, ln := range m.PeerListeners {
|
||||||
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
|
newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
|
||||||
}
|
}
|
||||||
m.PeerListeners = newPeerListeners
|
m.PeerListeners = newPeerListeners
|
||||||
newClientListeners := make([]net.Listener, 0)
|
newClientListeners := make([]net.Listener, 0)
|
||||||
for _, ln := range m.ClientListeners {
|
for _, ln := range m.ClientListeners {
|
||||||
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
|
newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
|
||||||
}
|
}
|
||||||
m.ClientListeners = newClientListeners
|
m.ClientListeners = newClientListeners
|
||||||
|
|
||||||
@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) {
|
|||||||
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
|
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
|
||||||
cfgtls := transport.TLSInfo{}
|
cfgtls := transport.TLSInfo{}
|
||||||
if tls != nil {
|
if tls != nil {
|
||||||
cfgtls = *tls
|
cfgtls = *tls
|
||||||
|
@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
|||||||
dc.Launch(t)
|
dc.Launch(t)
|
||||||
defer dc.Terminate(t)
|
defer dc.Terminate(t)
|
||||||
// init discovery token space
|
// init discovery token space
|
||||||
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
|
||||||
dkapi := client.NewKeysAPI(dcc)
|
dkapi := client.NewKeysAPI(dcc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
|
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
|
||||||
@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
|
|||||||
dc.Launch(t)
|
dc.Launch(t)
|
||||||
defer dc.Terminate(t)
|
defer dc.Terminate(t)
|
||||||
// init discovery token space
|
// init discovery token space
|
||||||
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
|
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
|
||||||
dkapi := client.NewKeysAPI(dcc)
|
dkapi := client.NewKeysAPI(dcc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
|
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
|
||||||
@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
|
|||||||
func TestForceNewCluster(t *testing.T) {
|
func TestForceNewCluster(t *testing.T) {
|
||||||
c := NewCluster(t, 3)
|
c := NewCluster(t, 3)
|
||||||
c.Launch(t)
|
c.Launch(t)
|
||||||
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
resp, err := kapi.Create(ctx, "/foo", "bar")
|
resp, err := kapi.Create(ctx, "/foo", "bar")
|
||||||
@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) {
|
|||||||
c.waitLeader(t, c.Members[:1])
|
c.waitLeader(t, c.Members[:1])
|
||||||
|
|
||||||
// use new http client to init new connection
|
// use new http client to init new connection
|
||||||
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
|
||||||
kapi = client.NewKeysAPI(cc)
|
kapi = client.NewKeysAPI(cc)
|
||||||
// ensure force restart keep the old data, and new cluster can make progress
|
// ensure force restart keep the old data, and new cluster can make progress
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
|
||||||
@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) {
|
|||||||
c.Members[1].Stop(t)
|
c.Members[1].Stop(t)
|
||||||
|
|
||||||
// send remove member-1 request to the cluster.
|
// send remove member-1 request to the cluster.
|
||||||
cc := mustNewHTTPClient(t, c.URLs(), nil)
|
cc := MustNewHTTPClient(t, c.URLs(), nil)
|
||||||
ma := client.NewMembersAPI(cc)
|
ma := client.NewMembersAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
// the proposal is not committed because member 1 is stopped, but the
|
// the proposal is not committed because member 1 is stopped, but the
|
||||||
@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) {
|
|||||||
c.waitLeader(t, c.Members)
|
c.waitLeader(t, c.Members)
|
||||||
|
|
||||||
// try to participate in cluster
|
// try to participate in cluster
|
||||||
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
|
cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
|
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
|
||||||
@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) {
|
|||||||
// a random key first, and check the new key could be got from all client urls
|
// a random key first, and check the new key could be got from all client urls
|
||||||
// of the cluster.
|
// of the cluster.
|
||||||
func clusterMustProgress(t *testing.T, membs []*member) {
|
func clusterMustProgress(t *testing.T, membs []*member) {
|
||||||
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
|
cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", rand.Int())
|
key := fmt.Sprintf("foo%d", rand.Int())
|
||||||
@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
|
|||||||
|
|
||||||
for i, m := range membs {
|
for i, m := range membs {
|
||||||
u := m.URL()
|
u := m.URL()
|
||||||
mcc := mustNewHTTPClient(t, []string{u}, nil)
|
mcc := MustNewHTTPClient(t, []string{u}, nil)
|
||||||
mkapi := client.NewKeysAPI(mcc)
|
mkapi := client.NewKeysAPI(mcc)
|
||||||
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
|
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
|
||||||
|
@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
|||||||
resps := make([]*client.Response, 120)
|
resps := make([]*client.Response, 120)
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < 120; i++ {
|
for i := 0; i < 120; i++ {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", i)
|
key := fmt.Sprintf("foo%d", i)
|
||||||
@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
|||||||
|
|
||||||
m.WaitOK(t)
|
m.WaitOK(t)
|
||||||
for i := 0; i < 120; i++ {
|
for i := 0; i < 120; i++ {
|
||||||
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
|
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
|
||||||
kapi := client.NewKeysAPI(cc)
|
kapi := client.NewKeysAPI(cc)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||||
key := fmt.Sprintf("foo%d", i)
|
key := fmt.Sprintf("foo%d", i)
|
||||||
|
1
test
1
test
@ -64,6 +64,7 @@ function integration_tests {
|
|||||||
intpid="$!"
|
intpid="$!"
|
||||||
wait $e2epid
|
wait $e2epid
|
||||||
wait $intpid
|
wait $intpid
|
||||||
|
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
|
||||||
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
|
||||||
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
|
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
|
||||||
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
|
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.3.0"
|
MinClusterVersion = "2.3.0"
|
||||||
Version = "3.0.0"
|
Version = "3.0.2"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
19
wal/wal.go
19
wal/wal.go
@ -129,15 +129,22 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.RemoveAll(dirpath); err != nil {
|
// rename of directory with locked files doesn't work on windows; close
|
||||||
return nil, err
|
// the WAL to release the locks so the directory can be renamed
|
||||||
}
|
w.Close()
|
||||||
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
if err := os.Rename(tmpdirpath, dirpath); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// reopen and relock
|
||||||
w.fp = newFilePipeline(w.dir, segmentSizeBytes)
|
newWAL, oerr := Open(dirpath, walpb.Snapshot{})
|
||||||
return w, nil
|
if oerr != nil {
|
||||||
|
return nil, oerr
|
||||||
|
}
|
||||||
|
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||||
|
newWAL.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newWAL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the WAL at the given snap.
|
// Open opens the WAL at the given snap.
|
||||||
|
Reference in New Issue
Block a user