Compare commits
36 Commits
Author | SHA1 | Date | |
---|---|---|---|
0678329cd6 | |||
9a0e0c2eae | |||
3e4d57c37d | |||
d30e764b2d | |||
b5b7c78f1b | |||
ee1c07c3d4 | |||
67c5d4dfd2 | |||
3afcbd6f83 | |||
8fed61b2eb | |||
c8d386e18c | |||
2b6a44b7b0 | |||
8069d08b96 | |||
58f035844c | |||
f83774b4cd | |||
12c32137a8 | |||
fce4cf4dc8 | |||
06a72b2702 | |||
fbaef05885 | |||
31a94d28e3 | |||
88660a303f | |||
53c74dbd0b | |||
8a8af60fad | |||
7de19fefe8 | |||
7750f387b0 | |||
e33ab24442 | |||
fce2c1eeaf | |||
6a3bb93305 | |||
21455d2f3b | |||
51bb4220c5 | |||
d8c506923f | |||
5d778f85ca | |||
02697ca725 | |||
bd693c7069 | |||
52c90cdcfb | |||
a88b22ac0a | |||
e93f8b8a12 |
@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
|
|||||||
|
|
||||||
### Member Migration
|
### Member Migration
|
||||||
|
|
||||||
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
||||||
|
|
||||||
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
|
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
|
|||||||
#### Copy the data directory of the now-idle member to the new machine
|
#### Copy the data directory of the now-idle member to the new machine
|
||||||
|
|
||||||
```
|
```
|
||||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||||
```
|
```
|
||||||
|
|
||||||
```
|
```
|
||||||
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
|
|||||||
|
|
||||||
#### Restoring the cluster
|
#### Restoring the cluster
|
||||||
|
|
||||||
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
|
||||||
|
|
||||||
|
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
||||||
|
|
||||||
### Client Request Timeout
|
### Client Request Timeout
|
||||||
|
|
||||||
|
@ -4,6 +4,10 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
|
|||||||
|
|
||||||
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
|
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
|
||||||
|
|
||||||
|
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
|
||||||
|
|
||||||
|
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
|
||||||
|
|
||||||
### Using an etcd proxy
|
### Using an etcd proxy
|
||||||
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).
|
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).
|
||||||
|
|
||||||
|
6
build
6
build
@ -11,6 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
|||||||
|
|
||||||
eval $(go env)
|
eval $(go env)
|
||||||
|
|
||||||
|
GIT_SHA=`git rev-parse --short HEAD`
|
||||||
|
|
||||||
# Static compilation is useful when etcd is run in a container
|
# Static compilation is useful when etcd is run in a container
|
||||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
|
||||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||||
|
@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
|
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
|
||||||
}
|
}
|
||||||
|
|
||||||
type membersAPIActionList struct{}
|
type membersAPIActionList struct{}
|
||||||
|
@ -25,7 +25,8 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// indirection for testing
|
// indirection for testing
|
||||||
lookupSRV = net.LookupSRV
|
lookupSRV = net.LookupSRV
|
||||||
|
resolveTCPAddr = net.ResolveTCPAddr
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
|
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
|
||||||
@ -38,7 +39,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
|||||||
|
|
||||||
// First, resolve the apurls
|
// First, resolve the apurls
|
||||||
for _, url := range apurls {
|
for _, url := range apurls {
|
||||||
tcpAddr, err := net.ResolveTCPAddr("tcp", url.Host)
|
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
|
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
|
||||||
return "", "", err
|
return "", "", err
|
||||||
@ -52,8 +53,9 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, srv := range addrs {
|
for _, srv := range addrs {
|
||||||
host := net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port))
|
target := strings.TrimSuffix(srv.Target, ".")
|
||||||
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
|
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
|
||||||
|
tcpAddr, err := resolveTCPAddr("tcp", host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
|
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
|
||||||
continue
|
continue
|
||||||
@ -68,8 +70,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
|||||||
n = fmt.Sprintf("%d", tempName)
|
n = fmt.Sprintf("%d", tempName)
|
||||||
tempName += 1
|
tempName += 1
|
||||||
}
|
}
|
||||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, tcpAddr.String()))
|
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
|
||||||
log.Printf("discovery: Got bootstrap from DNS for %s at host %s to %s%s", service, host, prefix, tcpAddr.String())
|
log.Printf("discovery: Got bootstrap from DNS for %s at %s%s", service, prefix, host)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -23,19 +23,26 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestSRVGetCluster(t *testing.T) {
|
func TestSRVGetCluster(t *testing.T) {
|
||||||
defer func() { lookupSRV = net.LookupSRV }()
|
defer func() {
|
||||||
|
lookupSRV = net.LookupSRV
|
||||||
|
resolveTCPAddr = net.ResolveTCPAddr
|
||||||
|
}()
|
||||||
|
|
||||||
name := "dnsClusterTest"
|
name := "dnsClusterTest"
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
withSSL []*net.SRV
|
withSSL []*net.SRV
|
||||||
withoutSSL []*net.SRV
|
withoutSSL []*net.SRV
|
||||||
urls []string
|
urls []string
|
||||||
expected string
|
dns map[string]string
|
||||||
|
|
||||||
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
|
|
||||||
"",
|
"",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -46,6 +53,8 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
},
|
},
|
||||||
[]*net.SRV{},
|
[]*net.SRV{},
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
|
|
||||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -58,6 +67,7 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
|
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -70,8 +80,22 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||||
},
|
},
|
||||||
[]string{"https://10.0.0.1:2480"},
|
[]string{"https://10.0.0.1:2480"},
|
||||||
|
nil,
|
||||||
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
|
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
|
||||||
},
|
},
|
||||||
|
// matching local member with resolved addr and return unresolved hostnames
|
||||||
|
{
|
||||||
|
[]*net.SRV{
|
||||||
|
&net.SRV{Target: "1.example.com.", Port: 2480},
|
||||||
|
&net.SRV{Target: "2.example.com.", Port: 2480},
|
||||||
|
&net.SRV{Target: "3.example.com.", Port: 2480},
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
[]string{"https://10.0.0.1:2480"},
|
||||||
|
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
|
||||||
|
|
||||||
|
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -84,6 +108,12 @@ func TestSRVGetCluster(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return "", nil, errors.New("Unkown service in mock")
|
return "", nil, errors.New("Unkown service in mock")
|
||||||
}
|
}
|
||||||
|
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||||
|
if tt.dns == nil || tt.dns[addr] == "" {
|
||||||
|
return net.ResolveTCPAddr(network, addr)
|
||||||
|
}
|
||||||
|
return net.ResolveTCPAddr(network, tt.dns[addr])
|
||||||
|
}
|
||||||
urls := testutil.MustNewURLs(t, tt.urls)
|
urls := testutil.MustNewURLs(t, tt.urls)
|
||||||
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,10 +44,10 @@ func NewBackupCommand() cli.Command {
|
|||||||
|
|
||||||
// handleBackup handles a request that intends to do a backup.
|
// handleBackup handles a request that intends to do a backup.
|
||||||
func handleBackup(c *cli.Context) {
|
func handleBackup(c *cli.Context) {
|
||||||
srcSnap := path.Join(c.String("data-dir"), "snap")
|
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
|
||||||
destSnap := path.Join(c.String("backup-dir"), "snap")
|
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
|
||||||
srcWAL := path.Join(c.String("data-dir"), "wal")
|
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
|
||||||
destWAL := path.Join(c.String("backup-dir"), "wal")
|
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
|
||||||
|
|
||||||
if err := os.MkdirAll(destSnap, 0700); err != nil {
|
if err := os.MkdirAll(destSnap, 0700); err != nil {
|
||||||
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
||||||
|
@ -26,6 +26,7 @@ func NewImportSnapCommand() cli.Command {
|
|||||||
Usage: "import a snapshot to a cluster",
|
Usage: "import a snapshot to a cluster",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
|
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
|
||||||
|
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
|
||||||
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
|
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
|
||||||
},
|
},
|
||||||
Action: handleImportSnap,
|
Action: handleImportSnap,
|
||||||
@ -36,7 +37,7 @@ func handleImportSnap(c *cli.Context) {
|
|||||||
d, err := ioutil.ReadFile(c.String("snap"))
|
d, err := ioutil.ReadFile(c.String("snap"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.String("snap") == "" {
|
if c.String("snap") == "" {
|
||||||
fmt.Printf("no snapshot file provided (use --snap)")
|
fmt.Printf("no snapshot file provided (use --snap)\n")
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
|
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
|
||||||
}
|
}
|
||||||
@ -83,6 +84,15 @@ func handleImportSnap(c *cli.Context) {
|
|||||||
handleError(ErrorFromEtcd, err)
|
handleError(ErrorFromEtcd, err)
|
||||||
}
|
}
|
||||||
n := copyKeys(all.Node, setc)
|
n := copyKeys(all.Node, setc)
|
||||||
|
|
||||||
|
hiddens := c.StringSlice("hidden")
|
||||||
|
for _, h := range hiddens {
|
||||||
|
allh, err := st.Get(h, true, true)
|
||||||
|
if err != nil {
|
||||||
|
handleError(ErrorFromEtcd, err)
|
||||||
|
}
|
||||||
|
n += copyKeys(allh.Node, setc)
|
||||||
|
}
|
||||||
close(setc)
|
close(setc)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
fmt.Printf("finished importing %d keys\n", n)
|
fmt.Printf("finished importing %d keys\n", n)
|
||||||
|
@ -31,7 +31,7 @@ func main() {
|
|||||||
app.Flags = []cli.Flag{
|
app.Flags = []cli.Flag{
|
||||||
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
|
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
|
||||||
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
|
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
|
||||||
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple` or `json`)"},
|
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple`, `extended` or `json`)"},
|
||||||
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
|
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
|
||||||
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
|
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
|
||||||
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},
|
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},
|
||||||
|
@ -15,18 +15,17 @@
|
|||||||
package etcdmain
|
package etcdmain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/pkg/cors"
|
"github.com/coreos/etcd/pkg/cors"
|
||||||
"github.com/coreos/etcd/pkg/flags"
|
"github.com/coreos/etcd/pkg/flags"
|
||||||
"github.com/coreos/etcd/pkg/netutil"
|
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
)
|
)
|
||||||
@ -64,6 +63,7 @@ var (
|
|||||||
|
|
||||||
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
|
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
|
||||||
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
||||||
|
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
|
||||||
)
|
)
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
@ -213,7 +213,10 @@ func (cfg *config) Parse(arguments []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.printVersion {
|
if cfg.printVersion {
|
||||||
fmt.Println("etcd version", version.Version)
|
fmt.Printf("etcd Version: %s\n", version.Version)
|
||||||
|
fmt.Printf("Git SHA: %s\n", version.GitSHA)
|
||||||
|
fmt.Printf("Go Version: %s\n", runtime.Version())
|
||||||
|
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,9 +258,8 @@ func (cfg *config) Parse(arguments []string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
|
||||||
if err := cfg.resolveUrls(); err != nil {
|
return errUnsetAdvertiseClientURLsFlag
|
||||||
return errors.New("cannot resolve DNS hostnames.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||||
@ -275,10 +277,6 @@ func initialClusterFromName(name string) string {
|
|||||||
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
|
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg *config) resolveUrls() error {
|
|
||||||
return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
|
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
|
||||||
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
|
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
|
||||||
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }
|
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }
|
||||||
|
@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
|
|||||||
"-snapshot-count=10",
|
"-snapshot-count=10",
|
||||||
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
|
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
|
||||||
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
|
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
|
||||||
|
// it should be set if -listen-client-urls is set
|
||||||
|
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
|
||||||
}
|
}
|
||||||
wcfg := &config{
|
wcfg := &config{
|
||||||
dir: "testdir",
|
dir: "testdir",
|
||||||
|
@ -56,8 +56,12 @@ func Main() {
|
|||||||
cfg := NewConfig()
|
cfg := NewConfig()
|
||||||
err := cfg.Parse(os.Args[1:])
|
err := cfg.Parse(os.Args[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("etcd: error verifying flags, %v. See 'etcd -help'.", err)
|
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
|
||||||
os.Exit(2)
|
switch err {
|
||||||
|
case errUnsetAdvertiseClientURLsFlag:
|
||||||
|
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
|
||||||
|
}
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
var stopped <-chan struct{}
|
var stopped <-chan struct{}
|
||||||
@ -90,8 +94,10 @@ func Main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
case discovery.ErrDuplicateID:
|
case discovery.ErrDuplicateID:
|
||||||
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
|
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
|
||||||
cfg.name, cfg.durl, cfg.dir)
|
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
|
||||||
|
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
|
||||||
|
log.Printf("or use a new discovery token if the previous bootstrap failed.")
|
||||||
default:
|
default:
|
||||||
log.Fatalf("etcd: %v", err)
|
log.Fatalf("etcd: %v", err)
|
||||||
}
|
}
|
||||||
@ -191,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||||||
Handler: etcdhttp.NewClientHandler(s),
|
Handler: etcdhttp.NewClientHandler(s),
|
||||||
Info: cfg.corsInfo,
|
Info: cfg.corsInfo,
|
||||||
}
|
}
|
||||||
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
|
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
|
||||||
// Start the peer server in a goroutine
|
// Start the peer server in a goroutine
|
||||||
for _, l := range plns {
|
for _, l := range plns {
|
||||||
go func(l net.Listener) {
|
go func(l net.Listener) {
|
||||||
|
@ -56,7 +56,8 @@ clustering flags:
|
|||||||
--initial-cluster-token 'etcd-cluster'
|
--initial-cluster-token 'etcd-cluster'
|
||||||
initial cluster token for the etcd cluster during bootstrap.
|
initial cluster token for the etcd cluster during bootstrap.
|
||||||
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
|
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
|
||||||
list of this member's client URLs to advertise to the rest of the cluster.
|
list of this member's client URLs to advertise to the public.
|
||||||
|
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
|
||||||
--discovery ''
|
--discovery ''
|
||||||
discovery URL used to bootstrap the cluster.
|
discovery URL used to bootstrap the cluster.
|
||||||
--discovery-fallback 'proxy'
|
--discovery-fallback 'proxy'
|
||||||
|
@ -59,12 +59,6 @@ type Cluster struct {
|
|||||||
id types.ID
|
id types.ID
|
||||||
token string
|
token string
|
||||||
store store.Store
|
store store.Store
|
||||||
// index is the raft index that cluster is updated at bootstrap
|
|
||||||
// from remote cluster info.
|
|
||||||
// It may have a higher value than local raft index, because it
|
|
||||||
// displays a further view of the cluster.
|
|
||||||
// TODO: upgrade it as last modified index
|
|
||||||
index uint64
|
|
||||||
|
|
||||||
sync.Mutex // guards members and removed map
|
sync.Mutex // guards members and removed map
|
||||||
members map[types.ID]*Member
|
members map[types.ID]*Member
|
||||||
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
|||||||
|
|
||||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||||
|
|
||||||
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
|
|
||||||
|
|
||||||
func (c *Cluster) Recover() {
|
func (c *Cluster) Recover() {
|
||||||
c.members, c.removed = membersFromStore(c.store)
|
c.members, c.removed = membersFromStore(c.store)
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var index uint64
|
return NewClusterFromMembers("", id, membs), nil
|
||||||
// The header at or before v2.0.3 doesn't have this field. For backward
|
|
||||||
// compatibility, it checks whether the field exists.
|
|
||||||
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
|
|
||||||
index, err = strconv.ParseUint(indexStr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
if logerr {
|
|
||||||
log.Printf("etcdserver: could not parse raft index: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cl := NewClusterFromMembers("", id, membs)
|
|
||||||
cl.UpdateIndex(index)
|
|
||||||
return cl, nil
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||||
}
|
}
|
||||||
|
@ -119,7 +119,6 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, err)
|
writeError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case resp.Event != nil:
|
case resp.Event != nil:
|
||||||
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
|
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
@ -29,10 +28,9 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
||||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
|
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
|
||||||
mh := &peerMembersHandler{
|
mh := &peerMembersHandler{
|
||||||
clusterInfo: clusterInfo,
|
clusterInfo: clusterInfo,
|
||||||
timer: timer,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
|
|||||||
|
|
||||||
type peerMembersHandler struct {
|
type peerMembersHandler struct {
|
||||||
clusterInfo etcdserver.ClusterInfo
|
clusterInfo etcdserver.ClusterInfo
|
||||||
timer etcdserver.RaftTimer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||||
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
|
|
||||||
|
|
||||||
if r.URL.Path != peerMembersPrefix {
|
if r.URL.Path != peerMembersPrefix {
|
||||||
http.Error(w, "bad path", http.StatusBadRequest)
|
http.Error(w, "bad path", http.StatusBadRequest)
|
||||||
|
@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
|||||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte("test data"))
|
w.Write([]byte("test data"))
|
||||||
})
|
})
|
||||||
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
|
ph := NewPeerHandler(&fakeCluster{}, h)
|
||||||
srv := httptest.NewServer(ph)
|
srv := httptest.NewServer(ph)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
|
|||||||
id: 1,
|
id: 1,
|
||||||
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
||||||
}
|
}
|
||||||
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
|
h := &peerMembersHandler{clusterInfo: cluster}
|
||||||
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -158,6 +158,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
haveWAL := wal.Exist(cfg.WALDir())
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
ss := snap.New(cfg.SnapDir())
|
ss := snap.New(cfg.SnapDir())
|
||||||
|
|
||||||
|
var remotes []*Member
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case !haveWAL && !cfg.NewCluster:
|
||||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||||
@ -170,7 +171,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
||||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||||
}
|
}
|
||||||
cfg.Cluster.UpdateIndex(existingCluster.index)
|
remotes = existingCluster.Members()
|
||||||
cfg.Cluster.SetID(existingCluster.id)
|
cfg.Cluster.SetID(existingCluster.id)
|
||||||
cfg.Cluster.SetStore(st)
|
cfg.Cluster.SetStore(st)
|
||||||
cfg.Print()
|
cfg.Print()
|
||||||
@ -238,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
ID: id.String(),
|
ID: id.String(),
|
||||||
}
|
}
|
||||||
|
sstats.Initialize()
|
||||||
lstats := stats.NewLeaderStats(id.String())
|
lstats := stats.NewLeaderStats(id.String())
|
||||||
|
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
@ -260,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: move transport initialization near the definition of remote
|
||||||
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
||||||
// add all the remote members into sendhub
|
// add all remotes into transport
|
||||||
|
for _, m := range remotes {
|
||||||
|
if m.ID != id {
|
||||||
|
tr.AddRemote(m.ID, m.PeerURLs)
|
||||||
|
}
|
||||||
|
}
|
||||||
for _, m := range cfg.Cluster.Members() {
|
for _, m := range cfg.Cluster.Members() {
|
||||||
if m.ID != id {
|
if m.ID != id {
|
||||||
tr.AddPeer(m.ID, m.PeerURLs)
|
tr.AddPeer(m.ID, m.PeerURLs)
|
||||||
@ -292,7 +300,6 @@ func (s *EtcdServer) start() {
|
|||||||
s.w = wait.New()
|
s.w = wait.New()
|
||||||
s.done = make(chan struct{})
|
s.done = make(chan struct{})
|
||||||
s.stop = make(chan struct{})
|
s.stop = make(chan struct{})
|
||||||
s.stats.Initialize()
|
|
||||||
// TODO: if this is an empty log, writes all peer infos
|
// TODO: if this is an empty log, writes all peer infos
|
||||||
// into the first entry
|
// into the first entry
|
||||||
go s.run()
|
go s.run()
|
||||||
@ -395,19 +402,15 @@ func (s *EtcdServer) run() {
|
|||||||
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
||||||
log.Panicf("recovery store error: %v", err)
|
log.Panicf("recovery store error: %v", err)
|
||||||
}
|
}
|
||||||
|
s.Cluster.Recover()
|
||||||
|
|
||||||
// It avoids snapshot recovery overwriting newer cluster and
|
// recover raft transport
|
||||||
// transport setting, which may block the communication.
|
s.r.transport.RemoveAllPeers()
|
||||||
if s.Cluster.index < rd.Snapshot.Metadata.Index {
|
for _, m := range s.Cluster.Members() {
|
||||||
s.Cluster.Recover()
|
if m.ID == s.ID() {
|
||||||
// recover raft transport
|
continue
|
||||||
s.r.transport.RemoveAllPeers()
|
|
||||||
for _, m := range s.Cluster.Members() {
|
|
||||||
if m.ID == s.ID() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
|
||||||
}
|
}
|
||||||
|
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
appliedi = rd.Snapshot.Metadata.Index
|
appliedi = rd.Snapshot.Metadata.Index
|
||||||
@ -671,9 +674,9 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) send(ms []raftpb.Message) {
|
func (s *EtcdServer) send(ms []raftpb.Message) {
|
||||||
for _, m := range ms {
|
for i, _ := range ms {
|
||||||
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
|
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
|
||||||
m.To = 0
|
ms[i].To = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.r.transport.Send(ms)
|
s.r.transport.Send(ms)
|
||||||
@ -723,7 +726,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|||||||
switch {
|
switch {
|
||||||
case existsSet:
|
case existsSet:
|
||||||
if exists {
|
if exists {
|
||||||
return f(s.store.Update(r.Path, r.Val, expr))
|
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||||
|
return f(s.store.Update(r.Path, r.Val, expr))
|
||||||
|
} else {
|
||||||
|
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
|
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
|
||||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||||
|
@ -235,20 +235,18 @@ func TestApplyRequest(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// PUT with PrevExist=true *and* PrevIndex set ==> Update
|
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
|
||||||
// TODO(jonboulle): is this expected?!
|
|
||||||
{
|
{
|
||||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
|
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
|
||||||
Response{Event: &store.Event{}},
|
Response{Event: &store.Event{}},
|
||||||
[]testutil.Action{
|
[]testutil.Action{
|
||||||
{
|
{
|
||||||
Name: "Update",
|
Name: "CompareAndSwap",
|
||||||
Params: []interface{}{"", "", time.Time{}},
|
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// PUT with PrevExist=false *and* PrevIndex set ==> Create
|
// PUT with PrevExist=false *and* PrevIndex set ==> Create
|
||||||
// TODO(jonboulle): is this expected?!
|
|
||||||
{
|
{
|
||||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
|
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
|
||||||
Response{Event: &store.Event{}},
|
Response{Event: &store.Event{}},
|
||||||
@ -1391,6 +1389,7 @@ type nopTransporter struct{}
|
|||||||
|
|
||||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||||
|
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||||
func (s *nopTransporter) RemoveAllPeers() {}
|
func (s *nopTransporter) RemoveAllPeers() {}
|
||||||
|
@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
|
|||||||
clusterMustProgress(t, c.Members[:1])
|
clusterMustProgress(t, c.Members[:1])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure we can remove a member then add a new one back immediately.
|
||||||
|
func TestIssue2681(t *testing.T) {
|
||||||
|
defer afterTest(t)
|
||||||
|
c := NewCluster(t, 5)
|
||||||
|
c.Launch(t)
|
||||||
|
defer c.Terminate(t)
|
||||||
|
|
||||||
|
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||||
|
c.waitLeader(t, c.Members)
|
||||||
|
|
||||||
|
c.AddMember(t)
|
||||||
|
c.waitLeader(t, c.Members)
|
||||||
|
clusterMustProgress(t, c.Members)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure we can remove a member after a snapshot then add a new one back.
|
||||||
|
func TestIssue2746(t *testing.T) {
|
||||||
|
defer afterTest(t)
|
||||||
|
c := NewCluster(t, 5)
|
||||||
|
|
||||||
|
for _, m := range c.Members {
|
||||||
|
m.SnapCount = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Launch(t)
|
||||||
|
defer c.Terminate(t)
|
||||||
|
|
||||||
|
// force a snapshot
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
clusterMustProgress(t, c.Members)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||||
|
c.waitLeader(t, c.Members)
|
||||||
|
|
||||||
|
c.AddMember(t)
|
||||||
|
c.waitLeader(t, c.Members)
|
||||||
|
clusterMustProgress(t, c.Members)
|
||||||
|
}
|
||||||
|
|
||||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||||
// a random key first, and check the new key could be got from all client urls
|
// a random key first, and check the new key could be got from all client urls
|
||||||
// of the cluster.
|
// of the cluster.
|
||||||
@ -526,7 +566,7 @@ func (m *member) Launch() error {
|
|||||||
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
||||||
m.s.Start()
|
m.s.Start()
|
||||||
|
|
||||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
|
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
|
||||||
|
|
||||||
for _, ln := range m.PeerListeners {
|
for _, ln := range m.PeerListeners {
|
||||||
hs := &httptest.Server{
|
hs := &httptest.Server{
|
||||||
|
90
pkg/fileutil/lock_plan9.go
Normal file
90
pkg/fileutil/lock_plan9.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrLocked = errors.New("file already locked")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Lock interface {
|
||||||
|
Name() string
|
||||||
|
TryLock() error
|
||||||
|
Lock() error
|
||||||
|
Unlock() error
|
||||||
|
Destroy() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type lock struct {
|
||||||
|
fname string
|
||||||
|
file *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *lock) Name() string {
|
||||||
|
return l.fname
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryLock acquires exclusivity on the lock without blocking
|
||||||
|
func (l *lock) TryLock() error {
|
||||||
|
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(l.fname)
|
||||||
|
if err != nil {
|
||||||
|
return ErrLocked
|
||||||
|
}
|
||||||
|
|
||||||
|
l.file = f
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock acquires exclusivity on the lock with blocking
|
||||||
|
func (l *lock) Lock() error {
|
||||||
|
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
f, err := os.Open(l.fname)
|
||||||
|
if err == nil {
|
||||||
|
l.file = f
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock unlocks the lock
|
||||||
|
func (l *lock) Unlock() error {
|
||||||
|
return l.file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *lock) Destroy() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLock(file string) (Lock, error) {
|
||||||
|
l := &lock{fname: file}
|
||||||
|
return l, nil
|
||||||
|
}
|
98
pkg/fileutil/lock_solaris.go
Normal file
98
pkg/fileutil/lock_solaris.go
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// +build solaris
|
||||||
|
|
||||||
|
package fileutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrLocked = errors.New("file already locked")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Lock interface {
|
||||||
|
Name() string
|
||||||
|
TryLock() error
|
||||||
|
Lock() error
|
||||||
|
Unlock() error
|
||||||
|
Destroy() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type lock struct {
|
||||||
|
fd int
|
||||||
|
file *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *lock) Name() string {
|
||||||
|
return l.file.Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryLock acquires exclusivity on the lock without blocking
|
||||||
|
func (l *lock) TryLock() error {
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
lock.Type = syscall.F_WRLCK
|
||||||
|
lock.Whence = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||||
|
if err != nil && err == syscall.EAGAIN {
|
||||||
|
return ErrLocked
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock acquires exclusivity on the lock without blocking
|
||||||
|
func (l *lock) Lock() error {
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Type = syscall.F_WRLCK
|
||||||
|
lock.Whence = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock unlocks the lock
|
||||||
|
func (l *lock) Unlock() error {
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Type = syscall.F_UNLCK
|
||||||
|
lock.Whence = 0
|
||||||
|
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||||
|
if err != nil && err == syscall.EAGAIN {
|
||||||
|
return ErrLocked
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *lock) Destroy() error {
|
||||||
|
return l.file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLock(file string) (Lock, error) {
|
||||||
|
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
l := &lock{int(f.Fd()), f}
|
||||||
|
return l, nil
|
||||||
|
}
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// +build !windows,!plan9
|
// +build !windows,!plan9,!solaris
|
||||||
|
|
||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -65,6 +66,13 @@ func (d *director) refresh() {
|
|||||||
}
|
}
|
||||||
endpoints = append(endpoints, newEndpoint(*uu))
|
endpoints = append(endpoints, newEndpoint(*uu))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shuffle array to avoid connections being "stuck" to a single endpoint
|
||||||
|
for i := range endpoints {
|
||||||
|
j := rand.Intn(i + 1)
|
||||||
|
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
|
||||||
|
}
|
||||||
|
|
||||||
d.ep = endpoints
|
d.ep = endpoints
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +73,25 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
completeCh := make(chan bool, 1)
|
||||||
|
closeNotifier, ok := rw.(http.CloseNotifier)
|
||||||
|
if ok {
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-closeNotifier.CloseNotify():
|
||||||
|
tp, ok := p.transport.(*http.Transport)
|
||||||
|
if ok {
|
||||||
|
tp.CancelRequest(proxyreq)
|
||||||
|
}
|
||||||
|
case <-completeCh:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
completeCh <- true
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
var res *http.Response
|
var res *http.Response
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *raftLog) String() string {
|
func (l *raftLog) String() string {
|
||||||
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
|
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
||||||
|
@ -199,7 +199,7 @@ func (p *peer) handle() {
|
|||||||
log.Printf("sender: the connection with %s became inactive", p.id)
|
log.Printf("sender: the connection with %s became inactive", p.id)
|
||||||
p.active = false
|
p.active = false
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||||
p.fs.Fail()
|
p.fs.Fail()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -208,7 +208,7 @@ func (p *peer) handle() {
|
|||||||
p.active = true
|
p.active = true
|
||||||
p.errored = nil
|
p.errored = nil
|
||||||
}
|
}
|
||||||
if m.Type == raftpb.MsgApp {
|
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||||
p.fs.Succ(end.Sub(start))
|
p.fs.Succ(end.Sub(start))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
42
rafthttp/remote.go
Normal file
42
rafthttp/remote.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package rafthttp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/types"
|
||||||
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type remote struct {
|
||||||
|
id types.ID
|
||||||
|
peer *peer
|
||||||
|
}
|
||||||
|
|
||||||
|
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||||
|
return &remote{
|
||||||
|
id: to,
|
||||||
|
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *remote) Send(m raftpb.Message) {
|
||||||
|
g.peer.send(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *remote) Stop() {
|
||||||
|
g.peer.Stop()
|
||||||
|
}
|
@ -35,6 +35,12 @@ type Raft interface {
|
|||||||
type Transporter interface {
|
type Transporter interface {
|
||||||
Handler() http.Handler
|
Handler() http.Handler
|
||||||
Send(m []raftpb.Message)
|
Send(m []raftpb.Message)
|
||||||
|
// AddRemote adds a remote with given peer urls into the transport.
|
||||||
|
// A remote helps newly joined member to catch up the progress of cluster,
|
||||||
|
// and will not be used after that.
|
||||||
|
// It is the caller's responsibility to ensure the urls are all vaild,
|
||||||
|
// or it panics.
|
||||||
|
AddRemote(id types.ID, urls []string)
|
||||||
AddPeer(id types.ID, urls []string)
|
AddPeer(id types.ID, urls []string)
|
||||||
RemovePeer(id types.ID)
|
RemovePeer(id types.ID)
|
||||||
RemoveAllPeers()
|
RemoveAllPeers()
|
||||||
@ -50,9 +56,10 @@ type transport struct {
|
|||||||
serverStats *stats.ServerStats
|
serverStats *stats.ServerStats
|
||||||
leaderStats *stats.LeaderStats
|
leaderStats *stats.LeaderStats
|
||||||
|
|
||||||
mu sync.RWMutex // protect the peer map
|
mu sync.RWMutex // protect the remote and peer map
|
||||||
peers map[types.ID]*peer // remote peers
|
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||||
errorc chan error
|
peers map[types.ID]*peer // peers map
|
||||||
|
errorc chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
|
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
|
||||||
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
|
|||||||
raft: r,
|
raft: r,
|
||||||
serverStats: ss,
|
serverStats: ss,
|
||||||
leaderStats: ls,
|
leaderStats: ls,
|
||||||
|
remotes: make(map[types.ID]*remote),
|
||||||
peers: make(map[types.ID]*peer),
|
peers: make(map[types.ID]*peer),
|
||||||
errorc: errorc,
|
errorc: errorc,
|
||||||
}
|
}
|
||||||
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
to := types.ID(m.To)
|
to := types.ID(m.To)
|
||||||
|
|
||||||
p, ok := t.peers[to]
|
p, ok := t.peers[to]
|
||||||
if !ok {
|
if ok {
|
||||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
if m.Type == raftpb.MsgApp {
|
||||||
|
t.serverStats.SendAppendReq(m.Size())
|
||||||
|
}
|
||||||
|
p.Send(m)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.Type == raftpb.MsgApp {
|
g, ok := t.remotes[to]
|
||||||
t.serverStats.SendAppendReq(m.Size())
|
if ok {
|
||||||
|
g.Send(m)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Send(m)
|
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transport) Stop() {
|
func (t *transport) Stop() {
|
||||||
|
for _, r := range t.remotes {
|
||||||
|
r.Stop()
|
||||||
|
}
|
||||||
for _, p := range t.peers {
|
for _, p := range t.peers {
|
||||||
p.Stop()
|
p.Stop()
|
||||||
}
|
}
|
||||||
@ -113,6 +130,21 @@ func (t *transport) Stop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *transport) AddRemote(id types.ID, us []string) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
if _, ok := t.remotes[id]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peerURL := us[0]
|
||||||
|
u, err := url.Parse(peerURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("unexpect peer url %s", peerURL)
|
||||||
|
}
|
||||||
|
u.Path = path.Join(u.Path, RaftPrefix)
|
||||||
|
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *transport) AddPeer(id types.ID, urls []string) {
|
func (t *transport) AddPeer(id types.ID, urls []string) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
@ -82,7 +82,10 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return snap, err
|
if err != nil {
|
||||||
|
return nil, ErrNoSnapshot
|
||||||
|
}
|
||||||
|
return snap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||||
|
@ -76,7 +76,7 @@ func TestBadCRC(t *testing.T) {
|
|||||||
// fake a crc mismatch
|
// fake a crc mismatch
|
||||||
crcTable = crc32.MakeTable(crc32.Koopman)
|
crcTable = crc32.MakeTable(crc32.Koopman)
|
||||||
|
|
||||||
_, err = ss.Load()
|
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
|
||||||
if err == nil || err != ErrCRCMismatch {
|
if err == nil || err != ErrCRCMismatch {
|
||||||
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
|
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
|
||||||
}
|
}
|
||||||
@ -182,7 +182,7 @@ func TestNoSnapshot(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
ss := New(dir)
|
ss := New(dir)
|
||||||
_, err = ss.Load()
|
_, err = ss.Load()
|
||||||
if err == nil || err != ErrNoSnapshot {
|
if err != ErrNoSnapshot {
|
||||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -195,14 +195,35 @@ func TestEmptySnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("shit"), 0x700)
|
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = Read(path.Join(dir, "1.snap"))
|
||||||
|
if err != ErrEmptySnapshot {
|
||||||
|
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAllSnapshotBroken ensures snapshotter returens
|
||||||
|
// ErrNoSnapshot if all the snapshots are broken.
|
||||||
|
func TestAllSnapshotBroken(t *testing.T) {
|
||||||
|
dir := path.Join(os.TempDir(), "snapshot")
|
||||||
|
err := os.Mkdir(dir, 0700)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ss := New(dir)
|
ss := New(dir)
|
||||||
_, err = ss.Load()
|
_, err = ss.Load()
|
||||||
if err == nil || err != ErrEmptySnapshot {
|
if err != ErrNoSnapshot {
|
||||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
|||||||
|
|
||||||
if ok { // add the new watcher to the back of the list
|
if ok { // add the new watcher to the back of the list
|
||||||
elem = l.PushBack(w)
|
elem = l.PushBack(w)
|
||||||
|
|
||||||
} else { // create a new list and add the new watcher
|
} else { // create a new list and add the new watcher
|
||||||
l = list.New()
|
l = list.New()
|
||||||
elem = l.PushBack(w)
|
elem = l.PushBack(w)
|
||||||
@ -146,6 +145,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
|||||||
// if we successfully notify a watcher
|
// if we successfully notify a watcher
|
||||||
// we need to remove the watcher from the list
|
// we need to remove the watcher from the list
|
||||||
// and decrease the counter
|
// and decrease the counter
|
||||||
|
w.removed = true
|
||||||
l.Remove(curr)
|
l.Remove(curr)
|
||||||
atomic.AddInt64(&wh.count, -1)
|
atomic.AddInt64(&wh.count, -1)
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Version = "2.0.8"
|
Version = "2.0.11"
|
||||||
|
|
||||||
|
// Git SHA Value will be set during build
|
||||||
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WalVersion is an enum for versions of etcd logs.
|
// WalVersion is an enum for versions of etcd logs.
|
||||||
|
@ -67,7 +67,7 @@ func checkWalNames(names []string) []string {
|
|||||||
wnames := make([]string, 0)
|
wnames := make([]string, 0)
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if _, _, err := parseWalName(name); err != nil {
|
if _, _, err := parseWalName(name); err != nil {
|
||||||
log.Printf("wal: parse %s error: %v", name, err)
|
log.Printf("wal: ignored file %v in wal", name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wnames = append(wnames, name)
|
wnames = append(wnames, name)
|
||||||
|
72
wal/wal.go
72
wal/wal.go
@ -273,30 +273,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Cut closes current file written and creates a new one ready to append.
|
// Cut closes current file written and creates a new one ready to append.
|
||||||
|
// cut first creates a temp wal file and writes necessary headers into it.
|
||||||
|
// Then cut atomtically rename temp wal file to a wal file.
|
||||||
func (w *WAL) Cut() error {
|
func (w *WAL) Cut() error {
|
||||||
// create a new wal file with name sequence + 1
|
// close old wal file
|
||||||
|
if err := w.sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
||||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
ftpath := fpath + ".tmp"
|
||||||
|
|
||||||
|
// create a temp wal file with name sequence + 1, or tuncate the existing one
|
||||||
|
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
l, err := fileutil.NewLock(f.Name())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = l.Lock()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.locks = append(w.locks, l)
|
|
||||||
if err = w.sync(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.f.Close()
|
|
||||||
|
|
||||||
// update writer and save the previous crc
|
// update writer and save the previous crc
|
||||||
w.f = f
|
w.f = ft
|
||||||
w.seq++
|
|
||||||
prevCrc := w.encoder.crc.Sum32()
|
prevCrc := w.encoder.crc.Sum32()
|
||||||
w.encoder = newEncoder(w.f, prevCrc)
|
w.encoder = newEncoder(w.f, prevCrc)
|
||||||
if err := w.saveCrc(prevCrc); err != nil {
|
if err := w.saveCrc(prevCrc); err != nil {
|
||||||
@ -308,7 +306,45 @@ func (w *WAL) Cut() error {
|
|||||||
if err := w.saveState(&w.state); err != nil {
|
if err := w.saveState(&w.state); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return w.sync()
|
// close temp wal file
|
||||||
|
if err := w.sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// atomically move temp wal file to wal file
|
||||||
|
if err := os.Rename(ftpath, fpath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// open the wal file and update writer again
|
||||||
|
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.f = f
|
||||||
|
prevCrc = w.encoder.crc.Sum32()
|
||||||
|
w.encoder = newEncoder(w.f, prevCrc)
|
||||||
|
|
||||||
|
// lock the new wal file
|
||||||
|
l, err := fileutil.NewLock(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = l.Lock()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.locks = append(w.locks, l)
|
||||||
|
|
||||||
|
// increase the wal seq
|
||||||
|
w.seq++
|
||||||
|
|
||||||
|
log.Printf("wal: segmented wal file %v is created", fpath)
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) sync() error {
|
func (w *WAL) sync() error {
|
||||||
|
Reference in New Issue
Block a user