Compare commits
84 Commits
client/pkg
...
v2.0.12
Author | SHA1 | Date | |
---|---|---|---|
5686c33e4b | |||
6fd2dfdebc | |||
896ce1668c | |||
0520b4cd24 | |||
6ee6f72c48 | |||
b4dd519a63 | |||
a98fff84e7 | |||
973cfbebda | |||
00d1d34cf8 | |||
fcf81fd6bf | |||
0678329cd6 | |||
9a0e0c2eae | |||
3e4d57c37d | |||
d30e764b2d | |||
b5b7c78f1b | |||
ee1c07c3d4 | |||
67c5d4dfd2 | |||
3afcbd6f83 | |||
8fed61b2eb | |||
c8d386e18c | |||
2b6a44b7b0 | |||
8069d08b96 | |||
5074235254 | |||
f59bddd74b | |||
58f035844c | |||
f83774b4cd | |||
12c32137a8 | |||
fce4cf4dc8 | |||
06a72b2702 | |||
fbaef05885 | |||
31a94d28e3 | |||
88660a303f | |||
53c74dbd0b | |||
8a8af60fad | |||
7de19fefe8 | |||
7750f387b0 | |||
e33ab24442 | |||
fce2c1eeaf | |||
6a3bb93305 | |||
21455d2f3b | |||
51bb4220c5 | |||
d8c506923f | |||
5d778f85ca | |||
02697ca725 | |||
bd693c7069 | |||
52c90cdcfb | |||
a88b22ac0a | |||
e93f8b8a12 | |||
86e616c6e9 | |||
5ae55a2c0d | |||
62ce6eef7b | |||
7df4f5c804 | |||
461c24e899 | |||
6d90d03bf0 | |||
9995e80a2c | |||
229405f113 | |||
b3f2a998d4 | |||
8436e901e9 | |||
c03f5cb941 | |||
0cb90e4bea | |||
df83b1b34e | |||
f2bef04009 | |||
02198336f6 | |||
0c9a226e0e | |||
5bd1d420bb | |||
a1cb5cb768 | |||
acba49fe81 | |||
e3c902228b | |||
52a2d143d2 | |||
f53d550a79 | |||
63b799b891 | |||
697883fb8c | |||
f794f87f26 | |||
0847986d4a | |||
9ea80c6ac1 | |||
02fb648abf | |||
4c9e1686b1 | |||
0fb9362c5c | |||
9481945228 | |||
e13b09e4d9 | |||
78e0149f41 | |||
4c86ab4868 | |||
59327bab47 | |||
62ed1ebf03 |
@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
|
||||
|
||||
### Member Migration
|
||||
|
||||
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
||||
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
||||
|
||||
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
|
||||
|
||||
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
|
||||
#### Copy the data directory of the now-idle member to the new machine
|
||||
|
||||
```
|
||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||
```
|
||||
|
||||
```
|
||||
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
|
||||
|
||||
#### Restoring the cluster
|
||||
|
||||
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
||||
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
|
||||
|
||||
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
||||
|
||||
### Client Request Timeout
|
||||
|
||||
|
@ -1,120 +0,0 @@
|
||||
## Allow-legacy mode
|
||||
|
||||
Allow-legacy is a special mode in etcd that contains logic to enable a running etcd cluster to smoothly transition between major versions of etcd. For example, the internal API versions between etcd 0.4 (internal v1) and etcd 2.0 (internal v2) aren't compatible and the cluster needs to be updated all at once to make the switch. To minimize downtime, allow-legacy coordinates with all of the members of the cluster to shutdown, migration of data and restart onto the new version.
|
||||
|
||||
Allow-legacy helps users upgrade v0.4 etcd clusters easily, and allows your etcd cluster to have a minimal amount of downtime -- less than 1 minute for clusters storing less than 50 MB.
|
||||
|
||||
It supports upgrading from internal v1 to internal v2 now.
|
||||
|
||||
### Setup
|
||||
|
||||
This mode is enabled if `ETCD_ALLOW_LEGACY_MODE` is set to true, or etcd is running in CoreOS system.
|
||||
|
||||
It treats `ETCD_BINARY_DIR` as the directory for etcd binaries, which is organized in this way:
|
||||
|
||||
```
|
||||
ETCD_BINARY_DIR
|
||||
|
|
||||
-- 1
|
||||
|
|
||||
-- 2
|
||||
```
|
||||
|
||||
`1` is etcd with internal v1 protocol. You should use etcd v0.4.7 here. `2` is etcd with internal v2 protocol, which is etcd v2.x.
|
||||
|
||||
The default value for `ETCD_BINARY_DIR` is `/usr/libexec/etcd/internal_versions/`.
|
||||
|
||||
### Upgrading a Cluster
|
||||
|
||||
When starting etcd with a v1 data directory and v1 flags, etcd executes the v0.4.7 binary and runs exactly the same as before. To start the migration, follow the steps below:
|
||||
|
||||

|
||||
|
||||
#### 1. Check the Cluster Health
|
||||
|
||||
Before upgrading, you should check the health of the cluster to double check that everything working perfectly. Check the health by running:
|
||||
|
||||
```
|
||||
$ etcdctl cluster-health
|
||||
cluster is healthy
|
||||
member 6e3bd23ae5f1eae0 is healthy
|
||||
member 924e2e83e93f2560 is healthy
|
||||
member a8266ecf031671f3 is healthy
|
||||
```
|
||||
|
||||
If the cluster and all members are healthy, you can start the upgrading process. If not, check the unhealthy machines and repair them using [admin guide](./admin_guide.md).
|
||||
|
||||
#### 2. Trigger the Upgrade
|
||||
|
||||
When you're ready, use the `etcdctl upgrade` command to start the upgrade the etcd cluster to 2.0:
|
||||
|
||||
```
|
||||
# Defaults work on a CoreOS machine running etcd
|
||||
$ etcdctl upgrade
|
||||
```
|
||||
|
||||
```
|
||||
# Advanced example specifying a peer url
|
||||
$ etcdctl upgrade --old-version=1 --new-version=2 --peer-url=$PEER_URL
|
||||
```
|
||||
|
||||
`PEER_URL` can be any accessible peer url of the cluster.
|
||||
|
||||
Once triggered, all peer-mode members will print out:
|
||||
|
||||
```
|
||||
detected next internal version 2, exit after 10 seconds.
|
||||
```
|
||||
|
||||
#### Parallel Coordinated Upgrade
|
||||
|
||||
As part of the upgrade, etcd does internal coordination within the cluster for a brief period and then exits. Clusters storing 50 MB should be unavailable for less than 1 minute.
|
||||
|
||||
#### Restart etcd Processes
|
||||
|
||||
After the etcd processes exit, they need to be restarted. You can do this manually or configure your unit system to do this automatically. On CoreOS, etcd is already configured to start automatically with systemd.
|
||||
|
||||
When restarted, the data directory of each member is upgraded, and afterwards etcd v2.0 will be running and servicing requests. The upgrade is now complete!
|
||||
|
||||
Standby-mode members are a special case — they will be upgraded into proxy mode (a new feature in etcd 2.0) upon restarting. When the upgrade is triggered, any standbys will exit with the message:
|
||||
|
||||
```
|
||||
Detect the cluster has been upgraded to internal API v2. Exit now.
|
||||
```
|
||||
|
||||
Once restarted, standbys run in v2.0 proxy mode, which proxy user requests to the etcd cluster.
|
||||
|
||||
#### 3. Check the Cluster Health
|
||||
|
||||
After the upgrade process, you can run the health check again to verify the upgrade. If the cluster is unhealthy or there is an unhealthy member, please refer to start [failure recovery](#failure-recovery).
|
||||
|
||||
### Downgrade
|
||||
|
||||
If the upgrading fails due to disk/network issues, you still can restart the upgrading process manually. However, once you upgrade etcd to internal v2 protocol, you CANNOT downgrade it back to internal v1 protocol. If you want to downgrade etcd in the future, please backup your v1 data dir beforehand.
|
||||
|
||||
### Upgrade Process on CoreOS
|
||||
|
||||
When running on a CoreOS system, allow-legacy mode is enabled by default and an automatic update will set up everything needed to execute the upgrade. The `etcd.service` on CoreOS is already configured to restart automatically. All you need to do is run `etcdctl upgrade` when you're ready, as described
|
||||
|
||||
### Internal Details
|
||||
|
||||
etcd v0.4.7 registers versions of available etcd binaries in its local machine into the key space at bootstrap stage. When the upgrade command is executed, etcdctl checks whether each member has internal-version-v2 etcd binary around. If that is true, each member is asked to record the fact that it needs to be upgraded the next time it reboots, and exits after 10 seconds.
|
||||
|
||||
Once restarted, etcd v2.0 sees the upgrade flag recorded. It upgrades the data directory, and executes etcd v2.0.
|
||||
|
||||
### Failure Recovery
|
||||
|
||||
If `etcdctl cluster-health` says that the cluster is unhealthy, the upgrade process fails, which may happen if the network is broken, or the disk cannot work.
|
||||
|
||||
The way to recover it is to manually upgrade the whole cluster to v2.0:
|
||||
|
||||
- Log into machines that ran v0.4 peer-mode etcd
|
||||
- Stop all etcd services
|
||||
- Remove the `member` directory under the etcd data-dir
|
||||
- Start etcd service using [2.0 flags](configuration.md). An example for this is:
|
||||
```
|
||||
$ etcd --data-dir=$DATA_DIR --listen-peer-urls http://$LISTEN_PEER_ADDR \
|
||||
--advertise-client-urls http://$ADVERTISE_CLIENT_ADDR \
|
||||
--listen-client-urls http://$LISTEN_CLIENT_ADDR
|
||||
```
|
||||
- When this is done, v2.0 etcd cluster should work now.
|
@ -287,7 +287,7 @@ curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=7'
|
||||
|
||||
The watch command returns immediately with the same response as previously.
|
||||
|
||||
**Note**: etcd only keeps the responses of the most recent 1000 events.
|
||||
**Note**: etcd only keeps the responses of the most recent 1000 events across all etcd keys.
|
||||
It is recommended to send the response to another thread to process immediately
|
||||
instead of blocking the watch while processing the result.
|
||||
|
||||
|
@ -30,7 +30,7 @@ ETCD_INITIAL_CLUSTER_STATE=new
|
||||
```
|
||||
|
||||
```
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
|
||||
-initial-cluster-state new
|
||||
```
|
||||
|
||||
|
@ -135,7 +135,9 @@ The security flags help to [build a secure etcd cluster][security].
|
||||
|
||||
### Unsafe Flags
|
||||
|
||||
Be CAUTIOUS to use unsafe flags because it will break the guarantee given by consensus protocol. For example, it may panic if other members in the cluster are still alive. Follow the instructions when using these falgs.
|
||||
Please be CAUTIOUS when using unsafe flags because it will break the guarantees given by the consensus protocol.
|
||||
For example, it may panic if other members in the cluster are still alive.
|
||||
Follow the instructions when using these flags.
|
||||
|
||||
##### -force-new-cluster
|
||||
+ Force to create a new one-member cluster. It commits configuration changes in force to remove all existing members in the cluster and add itself. It needs to be set to [restore a backup][restore].
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 7.9 KiB |
@ -4,6 +4,10 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
|
||||
|
||||
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
|
||||
|
||||
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
|
||||
|
||||
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
|
||||
|
||||
### Using an etcd proxy
|
||||
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).
|
||||
|
||||
|
8
build
8
build
@ -11,8 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
||||
|
||||
eval $(go env)
|
||||
|
||||
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
|
||||
|
||||
# Static compilation is useful when etcd is run in a container
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
go build -o bin/etcd-migrate ${REPO_PATH}/tools/etcd-migrate
|
||||
go build -o bin/etcd-dump-logs ${REPO_PATH}/tools/etcd-dump-logs
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
|
@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
|
||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
|
||||
}
|
||||
|
||||
type membersAPIActionList struct{}
|
||||
|
@ -25,7 +25,8 @@ import (
|
||||
|
||||
var (
|
||||
// indirection for testing
|
||||
lookupSRV = net.LookupSRV
|
||||
lookupSRV = net.LookupSRV
|
||||
resolveTCPAddr = net.ResolveTCPAddr
|
||||
)
|
||||
|
||||
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
|
||||
@ -38,7 +39,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
|
||||
// First, resolve the apurls
|
||||
for _, url := range apurls {
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", url.Host)
|
||||
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
|
||||
if err != nil {
|
||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
|
||||
return "", "", err
|
||||
@ -52,8 +53,9 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
return err
|
||||
}
|
||||
for _, srv := range addrs {
|
||||
host := net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port))
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
|
||||
target := strings.TrimSuffix(srv.Target, ".")
|
||||
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
|
||||
tcpAddr, err := resolveTCPAddr("tcp", host)
|
||||
if err != nil {
|
||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
|
||||
continue
|
||||
@ -68,8 +70,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
n = fmt.Sprintf("%d", tempName)
|
||||
tempName += 1
|
||||
}
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, tcpAddr.String()))
|
||||
log.Printf("discovery: Got bootstrap from DNS for %s at host %s to %s%s", service, host, prefix, tcpAddr.String())
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
|
||||
log.Printf("discovery: Got bootstrap from DNS for %s at %s%s", service, prefix, host)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -23,19 +23,26 @@ import (
|
||||
)
|
||||
|
||||
func TestSRVGetCluster(t *testing.T) {
|
||||
defer func() { lookupSRV = net.LookupSRV }()
|
||||
defer func() {
|
||||
lookupSRV = net.LookupSRV
|
||||
resolveTCPAddr = net.ResolveTCPAddr
|
||||
}()
|
||||
|
||||
name := "dnsClusterTest"
|
||||
tests := []struct {
|
||||
withSSL []*net.SRV
|
||||
withoutSSL []*net.SRV
|
||||
urls []string
|
||||
expected string
|
||||
dns map[string]string
|
||||
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
[]*net.SRV{},
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
nil,
|
||||
|
||||
"",
|
||||
},
|
||||
{
|
||||
@ -46,6 +53,8 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
},
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
nil,
|
||||
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
||||
},
|
||||
{
|
||||
@ -58,6 +67,7 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||
},
|
||||
nil,
|
||||
nil,
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
|
||||
},
|
||||
{
|
||||
@ -70,8 +80,22 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||
},
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
nil,
|
||||
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
|
||||
},
|
||||
// matching local member with resolved addr and return unresolved hostnames
|
||||
{
|
||||
[]*net.SRV{
|
||||
&net.SRV{Target: "1.example.com.", Port: 2480},
|
||||
&net.SRV{Target: "2.example.com.", Port: 2480},
|
||||
&net.SRV{Target: "3.example.com.", Port: 2480},
|
||||
},
|
||||
nil,
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
|
||||
|
||||
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -84,6 +108,12 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
}
|
||||
return "", nil, errors.New("Unkown service in mock")
|
||||
}
|
||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||
if tt.dns == nil || tt.dns[addr] == "" {
|
||||
return net.ResolveTCPAddr(network, addr)
|
||||
}
|
||||
return net.ResolveTCPAddr(network, tt.dns[addr])
|
||||
}
|
||||
urls := testutil.MustNewURLs(t, tt.urls)
|
||||
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
||||
if err != nil {
|
||||
|
@ -44,10 +44,10 @@ func NewBackupCommand() cli.Command {
|
||||
|
||||
// handleBackup handles a request that intends to do a backup.
|
||||
func handleBackup(c *cli.Context) {
|
||||
srcSnap := path.Join(c.String("data-dir"), "snap")
|
||||
destSnap := path.Join(c.String("backup-dir"), "snap")
|
||||
srcWAL := path.Join(c.String("data-dir"), "wal")
|
||||
destWAL := path.Join(c.String("backup-dir"), "wal")
|
||||
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
|
||||
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
|
||||
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
|
||||
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
|
||||
|
||||
if err := os.MkdirAll(destSnap, 0700); err != nil {
|
||||
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
||||
|
@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
|
||||
}
|
||||
|
||||
// do we have a leader?
|
||||
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
|
||||
cl := client.GetCluster()
|
||||
ep, ls0, err := getLeaderStats(tr, cl)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
fmt.Println("cluster may be unhealthy: failed to connect", cl)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
128
etcdctl/command/import_snap_command.go
Normal file
128
etcdctl/command/import_snap_command.go
Normal file
@ -0,0 +1,128 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
type set struct {
|
||||
key string
|
||||
value string
|
||||
ttl int64
|
||||
}
|
||||
|
||||
func NewImportSnapCommand() cli.Command {
|
||||
return cli.Command{
|
||||
Name: "import",
|
||||
Usage: "import a snapshot to a cluster",
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
|
||||
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
|
||||
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
|
||||
},
|
||||
Action: handleImportSnap,
|
||||
}
|
||||
}
|
||||
|
||||
func handleImportSnap(c *cli.Context) {
|
||||
d, err := ioutil.ReadFile(c.String("snap"))
|
||||
if err != nil {
|
||||
if c.String("snap") == "" {
|
||||
fmt.Printf("no snapshot file provided (use --snap)\n")
|
||||
} else {
|
||||
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
st := store.New()
|
||||
err = st.Recovery(d)
|
||||
if err != nil {
|
||||
fmt.Printf("cannot recover the snapshot file: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
endpoints, err := getEndpoints(c)
|
||||
if err != nil {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
tr, err := getTransport(c)
|
||||
if err != nil {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
setc := make(chan set)
|
||||
concurrent := c.Int("c")
|
||||
fmt.Printf("starting to import snapshot %s with %d clients\n", c.String("snap"), concurrent)
|
||||
for i := 0; i < concurrent; i++ {
|
||||
client := etcd.NewClient(endpoints)
|
||||
client.SetTransport(tr)
|
||||
|
||||
if c.GlobalBool("debug") {
|
||||
go dumpCURL(client)
|
||||
}
|
||||
|
||||
if ok := client.SyncCluster(); !ok {
|
||||
handleError(FailedToConnectToHost, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
|
||||
}
|
||||
wg.Add(1)
|
||||
go runSet(client, setc, wg)
|
||||
}
|
||||
|
||||
all, err := st.Get("/", true, true)
|
||||
if err != nil {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
n := copyKeys(all.Node, setc)
|
||||
|
||||
hiddens := c.StringSlice("hidden")
|
||||
for _, h := range hiddens {
|
||||
allh, err := st.Get(h, true, true)
|
||||
if err != nil {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
n += copyKeys(allh.Node, setc)
|
||||
}
|
||||
close(setc)
|
||||
wg.Wait()
|
||||
fmt.Printf("finished importing %d keys\n", n)
|
||||
}
|
||||
|
||||
func copyKeys(n *store.NodeExtern, setc chan set) int {
|
||||
num := 0
|
||||
if !n.Dir {
|
||||
setc <- set{n.Key, *n.Value, n.TTL}
|
||||
return 1
|
||||
}
|
||||
log.Println("entering dir:", n.Key)
|
||||
for _, nn := range n.Nodes {
|
||||
sub := copyKeys(nn, setc)
|
||||
num += sub
|
||||
}
|
||||
return num
|
||||
}
|
||||
|
||||
func runSet(c *etcd.Client, setc chan set, wg *sync.WaitGroup) {
|
||||
for s := range setc {
|
||||
log.Println("copying key:", s.key)
|
||||
if s.ttl != 0 && s.ttl < 300 {
|
||||
log.Printf("extending key %s's ttl to 300 seconds", s.key)
|
||||
s.ttl = 5 * 60
|
||||
}
|
||||
_, err := c.Set(s.key, s.value, uint64(s.ttl))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to copy key: %v\n", err)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
)
|
||||
|
||||
func UpgradeCommand() cli.Command {
|
||||
return cli.Command{
|
||||
Name: "upgrade",
|
||||
Usage: "upgrade an old version etcd cluster to a new version",
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{Name: "old-version", Value: "1", Usage: "Old internal version"},
|
||||
cli.StringFlag{Name: "new-version", Value: "2", Usage: "New internal version"},
|
||||
cli.StringFlag{Name: "peer-url", Value: "http://localhost:7001", Usage: "An etcd peer url string"},
|
||||
cli.StringFlag{Name: "peer-cert-file", Value: "", Usage: "identify HTTPS peer using this SSL certificate file"},
|
||||
cli.StringFlag{Name: "peer-key-file", Value: "", Usage: "identify HTTPS peer using this SSL key file"},
|
||||
cli.StringFlag{Name: "peer-ca-file", Value: "", Usage: "verify certificates of HTTPS-enabled peers using this CA bundle"},
|
||||
},
|
||||
Action: handleUpgrade,
|
||||
}
|
||||
}
|
||||
|
||||
func handleUpgrade(c *cli.Context) {
|
||||
if c.String("old-version") != "1" {
|
||||
fmt.Printf("Do not support upgrade from version %s\n", c.String("old-version"))
|
||||
os.Exit(1)
|
||||
}
|
||||
if c.String("new-version") != "2" {
|
||||
fmt.Printf("Do not support upgrade to version %s\n", c.String("new-version"))
|
||||
os.Exit(1)
|
||||
}
|
||||
tls := transport.TLSInfo{
|
||||
CAFile: c.String("peer-ca-file"),
|
||||
CertFile: c.String("peer-cert-file"),
|
||||
KeyFile: c.String("peer-key-file"),
|
||||
}
|
||||
t, err := transport.NewTransport(tls)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
client := http.Client{Transport: t}
|
||||
resp, err := client.Get(c.String("peer-url") + "/v2/admin/next-internal-version")
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to send upgrade request to %s: %v\n", c.String("peer-url"), err)
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
fmt.Println("Cluster will start upgrading from internal version 1 to 2 in 10 seconds.")
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
fmt.Println("Cluster cannot upgrade to 2: version is not 0.4.7")
|
||||
return
|
||||
}
|
||||
fmt.Printf("Faild to send upgrade request to %s: bad status code %d\n", c.String("cluster-url"), resp.StatusCode)
|
||||
}
|
@ -65,7 +65,7 @@ func getPeersFlagValue(c *cli.Context) []string {
|
||||
|
||||
// If we still don't have peers, use a default
|
||||
if peerstr == "" {
|
||||
peerstr = "127.0.0.1:4001"
|
||||
peerstr = "127.0.0.1:4001,127.0.0.1:2379"
|
||||
}
|
||||
|
||||
return strings.Split(peerstr, ",")
|
||||
|
@ -31,7 +31,7 @@ func main() {
|
||||
app.Flags = []cli.Flag{
|
||||
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
|
||||
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
|
||||
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple` or `json`)"},
|
||||
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple`, `extended` or `json`)"},
|
||||
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
|
||||
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
|
||||
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},
|
||||
@ -53,7 +53,7 @@ func main() {
|
||||
command.NewWatchCommand(),
|
||||
command.NewExecWatchCommand(),
|
||||
command.NewMemberCommand(),
|
||||
command.UpgradeCommand(),
|
||||
command.NewImportSnapCommand(),
|
||||
}
|
||||
|
||||
app.Run(os.Args)
|
||||
|
@ -15,18 +15,17 @@
|
||||
package etcdmain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
@ -41,6 +40,8 @@ const (
|
||||
|
||||
clusterStateFlagNew = "new"
|
||||
clusterStateFlagExisting = "existing"
|
||||
|
||||
defaultName = "default"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -62,6 +63,7 @@ var (
|
||||
|
||||
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
|
||||
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
||||
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
|
||||
)
|
||||
|
||||
type config struct {
|
||||
@ -137,7 +139,7 @@ func NewConfig() *config {
|
||||
fs.Var(flags.NewURLsValue("http://localhost:2379,http://localhost:4001"), "listen-client-urls", "List of URLs to listen on for client traffic")
|
||||
fs.UintVar(&cfg.maxSnapFiles, "max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited)")
|
||||
fs.UintVar(&cfg.maxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited)")
|
||||
fs.StringVar(&cfg.name, "name", "default", "Unique human-readable name for this node")
|
||||
fs.StringVar(&cfg.name, "name", defaultName, "Unique human-readable name for this node")
|
||||
fs.Uint64Var(&cfg.snapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
|
||||
fs.UintVar(&cfg.TickMs, "heartbeat-interval", 100, "Time (in milliseconds) of a heartbeat interval.")
|
||||
fs.UintVar(&cfg.ElectionMs, "election-timeout", 1000, "Time (in milliseconds) for an election to timeout.")
|
||||
@ -153,7 +155,7 @@ func NewConfig() *config {
|
||||
}
|
||||
fs.StringVar(&cfg.dproxy, "discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
|
||||
fs.StringVar(&cfg.dnsCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
|
||||
fs.StringVar(&cfg.initialCluster, "initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
|
||||
fs.StringVar(&cfg.initialCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for bootstrapping")
|
||||
fs.StringVar(&cfg.initialClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
|
||||
fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping")
|
||||
if err := cfg.clusterState.Set(clusterStateFlagNew); err != nil {
|
||||
@ -206,9 +208,15 @@ func (cfg *config) Parse(arguments []string) error {
|
||||
default:
|
||||
os.Exit(2)
|
||||
}
|
||||
if len(cfg.FlagSet.Args()) != 0 {
|
||||
return fmt.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0))
|
||||
}
|
||||
|
||||
if cfg.printVersion {
|
||||
fmt.Println("etcd version", version.Version)
|
||||
fmt.Printf("etcd Version: %s\n", version.Version)
|
||||
fmt.Printf("Git SHA: %s\n", version.GitSHA)
|
||||
fmt.Printf("Go Version: %s\n", runtime.Version())
|
||||
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
@ -251,15 +259,26 @@ func (cfg *config) Parse(arguments []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cfg.resolveUrls(); err != nil {
|
||||
return errors.New("cannot resolve DNS hostnames.")
|
||||
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
|
||||
if cfg.proxy.String() != proxyFlagOn {
|
||||
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
|
||||
return errUnsetAdvertiseClientURLsFlag
|
||||
}
|
||||
}
|
||||
|
||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *config) resolveUrls() error {
|
||||
return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls)
|
||||
func initialClusterFromName(name string) string {
|
||||
n := name
|
||||
if name == "" {
|
||||
n = defaultName
|
||||
}
|
||||
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
|
||||
}
|
||||
|
||||
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
|
||||
|
@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
|
||||
"-snapshot-count=10",
|
||||
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
|
||||
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
|
||||
// it should be set if -listen-client-urls is set
|
||||
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
|
||||
}
|
||||
wcfg := &config{
|
||||
dir: "testdir",
|
||||
@ -157,9 +159,9 @@ func TestConfigParsingV1Flags(t *testing.T) {
|
||||
"-addr=127.0.0.1:4001",
|
||||
}
|
||||
wcfg := NewConfig()
|
||||
wcfg.lpurls = []url.URL{{Scheme: "http", Host: "0.0.0.0:7001"}}
|
||||
wcfg.lpurls = []url.URL{{Scheme: "http", Host: "[::]:7001"}}
|
||||
wcfg.apurls = []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}
|
||||
wcfg.lcurls = []url.URL{{Scheme: "http", Host: "0.0.0.0:4001"}}
|
||||
wcfg.lcurls = []url.URL{{Scheme: "http", Host: "[::]:4001"}}
|
||||
wcfg.acurls = []url.URL{{Scheme: "http", Host: "127.0.0.1:4001"}}
|
||||
|
||||
cfg := NewConfig()
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@ -38,22 +39,48 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
)
|
||||
|
||||
type dirType string
|
||||
|
||||
const (
|
||||
// the owner can make/remove files inside the directory
|
||||
privateDirMode = 0700
|
||||
)
|
||||
|
||||
var (
|
||||
dirMember = dirType("member")
|
||||
dirProxy = dirType("proxy")
|
||||
dirEmpty = dirType("empty")
|
||||
)
|
||||
|
||||
func Main() {
|
||||
cfg := NewConfig()
|
||||
err := cfg.Parse(os.Args[1:])
|
||||
if err != nil {
|
||||
log.Printf("etcd: error verifying flags, %v", err)
|
||||
os.Exit(2)
|
||||
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
|
||||
switch err {
|
||||
case errUnsetAdvertiseClientURLsFlag:
|
||||
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var stopped <-chan struct{}
|
||||
|
||||
shouldProxy := cfg.isProxy()
|
||||
if cfg.name != defaultName && cfg.initialCluster == initialClusterFromName(defaultName) {
|
||||
cfg.initialCluster = initialClusterFromName(cfg.name)
|
||||
}
|
||||
|
||||
if cfg.dir == "" {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("etcd: no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
|
||||
which := identifyDataDirOrDie(cfg.dir)
|
||||
if which != dirEmpty {
|
||||
log.Printf("etcd: already initialized as %v before, starting as etcd %v...", which, which)
|
||||
}
|
||||
|
||||
shouldProxy := cfg.isProxy() || which == dirProxy
|
||||
if !shouldProxy {
|
||||
stopped, err = startEtcd(cfg)
|
||||
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
||||
@ -67,8 +94,10 @@ func Main() {
|
||||
if err != nil {
|
||||
switch err {
|
||||
case discovery.ErrDuplicateID:
|
||||
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
|
||||
cfg.name, cfg.durl, cfg.dir)
|
||||
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
|
||||
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
|
||||
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
|
||||
log.Printf("or use a new discovery token if the previous bootstrap failed.")
|
||||
default:
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
@ -87,12 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
if cfg.dir == "" {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -173,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Handler: etcdhttp.NewClientHandler(s),
|
||||
Info: cfg.corsInfo,
|
||||
}
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
|
||||
// Start the peer server in a goroutine
|
||||
for _, l := range plns {
|
||||
go func(l net.Listener) {
|
||||
@ -209,6 +233,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(cfg.clientTLSInfo)
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -218,10 +243,6 @@ func startProxy(cfg *config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if cfg.dir == "" {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no proxy data-dir provided, using default proxy data-dir ./%s", cfg.dir)
|
||||
}
|
||||
cfg.dir = path.Join(cfg.dir, "proxy")
|
||||
err = os.MkdirAll(cfg.dir, 0700)
|
||||
if err != nil {
|
||||
@ -340,3 +361,38 @@ func genClusterString(name string, urls types.URLs) string {
|
||||
}
|
||||
return strings.Join(addrs, ",")
|
||||
}
|
||||
|
||||
// identifyDataDirOrDie returns the type of the data dir.
|
||||
// Dies if the datadir is invalid.
|
||||
func identifyDataDirOrDie(dir string) dirType {
|
||||
names, err := fileutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return dirEmpty
|
||||
}
|
||||
log.Fatalf("etcd: error listing data dir: %s", dir)
|
||||
}
|
||||
|
||||
var m, p bool
|
||||
for _, name := range names {
|
||||
switch dirType(name) {
|
||||
case dirMember:
|
||||
m = true
|
||||
case dirProxy:
|
||||
p = true
|
||||
default:
|
||||
log.Printf("etcd: found invalid file/dir %s under data dir %s (Ignore this if you are upgrading etcd)", name, dir)
|
||||
}
|
||||
}
|
||||
|
||||
if m && p {
|
||||
log.Fatal("etcd: invalid datadir. Both member and proxy directories exist.")
|
||||
}
|
||||
if m {
|
||||
return dirMember
|
||||
}
|
||||
if p {
|
||||
return dirProxy
|
||||
}
|
||||
return dirEmpty
|
||||
}
|
||||
|
@ -56,7 +56,8 @@ clustering flags:
|
||||
--initial-cluster-token 'etcd-cluster'
|
||||
initial cluster token for the etcd cluster during bootstrap.
|
||||
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
|
||||
list of this member's client URLs to advertise to the rest of the cluster.
|
||||
list of this member's client URLs to advertise to the public.
|
||||
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
|
||||
--discovery ''
|
||||
discovery URL used to bootstrap the cluster.
|
||||
--discovery-fallback 'proxy'
|
||||
@ -91,8 +92,8 @@ security flags:
|
||||
|
||||
unsafe flags:
|
||||
|
||||
Please be CAUTIOUS to use unsafe flags because it will break the guarantee given
|
||||
by consensus protocol.
|
||||
Please be CAUTIOUS when using unsafe flags because it will break the guarantees
|
||||
given by the consensus protocol.
|
||||
|
||||
--force-new-cluster 'false'
|
||||
force to create a new one-member cluster.
|
||||
|
@ -59,12 +59,6 @@ type Cluster struct {
|
||||
id types.ID
|
||||
token string
|
||||
store store.Store
|
||||
// index is the raft index that cluster is updated at bootstrap
|
||||
// from remote cluster info.
|
||||
// It may have a higher value than local raft index, because it
|
||||
// displays a further view of the cluster.
|
||||
// TODO: upgrade it as last modified index
|
||||
index uint64
|
||||
|
||||
sync.Mutex // guards members and removed map
|
||||
members map[types.ID]*Member
|
||||
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
||||
|
||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||
|
||||
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
|
||||
|
||||
func (c *Cluster) Recover() {
|
||||
c.members, c.removed = membersFromStore(c.store)
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
|
||||
}
|
||||
continue
|
||||
}
|
||||
var index uint64
|
||||
// The header at or before v2.0.3 doesn't have this field. For backward
|
||||
// compatibility, it checks whether the field exists.
|
||||
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
|
||||
index, err = strconv.ParseUint(indexStr, 10, 64)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse raft index: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
cl := NewClusterFromMembers("", id, membs)
|
||||
cl.UpdateIndex(index)
|
||||
return cl, nil
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
@ -46,9 +46,43 @@ type ServerConfig struct {
|
||||
ElectionTicks int
|
||||
}
|
||||
|
||||
// VerifyBootstrapConfig sanity-checks the initial config and returns an error
|
||||
// for things that should never happen.
|
||||
func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
// VerifyBootstrapConfig sanity-checks the initial config for bootstrap case
|
||||
// and returns an error for things that should never happen.
|
||||
func (c *ServerConfig) VerifyBootstrap() error {
|
||||
if err := c.verifyLocalMember(true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.Cluster.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.Cluster.String() == "" && c.DiscoveryURL == "" {
|
||||
return fmt.Errorf("initial cluster unset and no discovery URL found")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// VerifyJoinExisting sanity-checks the initial config for join existing cluster
|
||||
// case and returns an error for things that should never happen.
|
||||
func (c *ServerConfig) VerifyJoinExisting() error {
|
||||
// no need for strict checking since the member have announced its
|
||||
// peer urls to the cluster before starting and do not have to set
|
||||
// it in the configuration again.
|
||||
if err := c.verifyLocalMember(false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.Cluster.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.DiscoveryURL != "" {
|
||||
return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyLocalMember verifies the configured member is in configured
|
||||
// cluster. If strict is set, it also verifies the configured member
|
||||
// has the same peer urls as configured advertised peer urls.
|
||||
func (c *ServerConfig) verifyLocalMember(strict bool) error {
|
||||
m := c.Cluster.MemberByName(c.Name)
|
||||
// Make sure the cluster at least contains the local server.
|
||||
if m == nil {
|
||||
@ -58,20 +92,14 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return fmt.Errorf("cannot use %x as member id", raft.None)
|
||||
}
|
||||
|
||||
if c.DiscoveryURL == "" && !c.NewCluster {
|
||||
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
|
||||
}
|
||||
|
||||
if err := c.Cluster.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Advertised peer URLs must match those in the cluster peer list
|
||||
// TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
|
||||
apurls := c.PeerURLs.StringSlice()
|
||||
sort.Strings(apurls)
|
||||
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
|
||||
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
|
||||
if strict {
|
||||
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
|
||||
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -22,6 +22,9 @@ import (
|
||||
)
|
||||
|
||||
func mustNewURLs(t *testing.T, urls []string) []url.URL {
|
||||
if len(urls) == 0 {
|
||||
return nil
|
||||
}
|
||||
u, err := types.NewURLs(urls)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating new URLs from %q: %v", urls, err)
|
||||
@ -29,77 +32,101 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL {
|
||||
return u
|
||||
}
|
||||
|
||||
func TestBootstrapConfigVerify(t *testing.T) {
|
||||
func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
|
||||
cluster, err := NewClusterFromString("", "")
|
||||
if err != nil {
|
||||
t.Fatalf("NewClusterFromString error: %v", err)
|
||||
}
|
||||
c := &ServerConfig{
|
||||
Name: "node1",
|
||||
DiscoveryURL: "",
|
||||
Cluster: cluster,
|
||||
}
|
||||
if err := c.VerifyBootstrap(); err == nil {
|
||||
t.Errorf("err = nil, want not nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
|
||||
cluster, err := NewClusterFromString("", "node1=http://127.0.0.1:2380")
|
||||
if err != nil {
|
||||
t.Fatalf("NewClusterFromString error: %v", err)
|
||||
}
|
||||
c := &ServerConfig{
|
||||
Name: "node1",
|
||||
DiscoveryURL: "http://127.0.0.1:4001/abcdefg",
|
||||
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
|
||||
Cluster: cluster,
|
||||
NewCluster: false,
|
||||
}
|
||||
if err := c.VerifyJoinExisting(); err == nil {
|
||||
t.Errorf("err = nil, want not nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigVerifyLocalMember(t *testing.T) {
|
||||
tests := []struct {
|
||||
clusterSetting string
|
||||
newclst bool
|
||||
apurls []string
|
||||
disc string
|
||||
strict bool
|
||||
shouldError bool
|
||||
}{
|
||||
{
|
||||
// Node must exist in cluster
|
||||
"",
|
||||
true,
|
||||
nil,
|
||||
"",
|
||||
true,
|
||||
|
||||
true,
|
||||
},
|
||||
{
|
||||
// Cannot have duplicate URLs in cluster config
|
||||
"node1=http://localhost:7001,node2=http://localhost:7001,node2=http://localhost:7002",
|
||||
true,
|
||||
nil,
|
||||
"",
|
||||
|
||||
true,
|
||||
},
|
||||
{
|
||||
// Node defined, ClusterState OK
|
||||
// Initial cluster set
|
||||
"node1=http://localhost:7001,node2=http://localhost:7002",
|
||||
true,
|
||||
[]string{"http://localhost:7001"},
|
||||
"",
|
||||
true,
|
||||
|
||||
false,
|
||||
},
|
||||
{
|
||||
// Node defined, discovery OK
|
||||
"node1=http://localhost:7001",
|
||||
false,
|
||||
[]string{"http://localhost:7001"},
|
||||
"http://discovery",
|
||||
|
||||
false,
|
||||
},
|
||||
{
|
||||
// Cannot have ClusterState!=new && !discovery
|
||||
"node1=http://localhost:7001",
|
||||
false,
|
||||
nil,
|
||||
"",
|
||||
|
||||
// Default initial cluster
|
||||
"node1=http://localhost:2380,node1=http://localhost:7001",
|
||||
[]string{"http://localhost:2380", "http://localhost:7001"},
|
||||
true,
|
||||
|
||||
false,
|
||||
},
|
||||
{
|
||||
// Advertised peer URLs must match those in cluster-state
|
||||
"node1=http://localhost:7001",
|
||||
true,
|
||||
[]string{"http://localhost:12345"},
|
||||
"",
|
||||
true,
|
||||
|
||||
true,
|
||||
},
|
||||
{
|
||||
// Advertised peer URLs must match those in cluster-state
|
||||
"node1=http://localhost:7001,node1=http://localhost:12345",
|
||||
true,
|
||||
[]string{"http://localhost:12345"},
|
||||
"",
|
||||
true,
|
||||
|
||||
true,
|
||||
},
|
||||
{
|
||||
// Advertised peer URLs must match those in cluster-state
|
||||
"node1=http://localhost:7001",
|
||||
[]string{},
|
||||
true,
|
||||
|
||||
true,
|
||||
},
|
||||
{
|
||||
// do not care about the urls if strict is not set
|
||||
"node1=http://localhost:7001",
|
||||
[]string{},
|
||||
false,
|
||||
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -108,15 +135,13 @@ func TestBootstrapConfigVerify(t *testing.T) {
|
||||
t.Fatalf("#%d: Got unexpected error: %v", i, err)
|
||||
}
|
||||
cfg := ServerConfig{
|
||||
Name: "node1",
|
||||
DiscoveryURL: tt.disc,
|
||||
Cluster: cluster,
|
||||
NewCluster: tt.newclst,
|
||||
Name: "node1",
|
||||
Cluster: cluster,
|
||||
}
|
||||
if tt.apurls != nil {
|
||||
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
|
||||
}
|
||||
err = cfg.VerifyBootstrapConfig()
|
||||
err = cfg.verifyLocalMember(tt.strict)
|
||||
if (err == nil) && tt.shouldError {
|
||||
t.Errorf("%#v", *cluster)
|
||||
t.Errorf("#%d: Got no error where one was expected", i)
|
||||
|
@ -119,7 +119,6 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case resp.Event != nil:
|
||||
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
|
||||
@ -334,7 +333,7 @@ func serveVersion(w http.ResponseWriter, r *http.Request) {
|
||||
if !allowMethod(w, r.Method, "GET") {
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, `{"releaseVersion":"%s","internalVersion":"%s"}`, version.Version, version.InternalVersion)
|
||||
w.Write([]byte("etcd " + version.Version))
|
||||
}
|
||||
|
||||
// parseKeyRequest converts a received http.Request on keysPrefix to
|
||||
|
@ -1327,7 +1327,7 @@ func TestServeVersion(t *testing.T) {
|
||||
if rw.Code != http.StatusOK {
|
||||
t.Errorf("code=%d, want %d", rw.Code, http.StatusOK)
|
||||
}
|
||||
w := fmt.Sprintf(`{"releaseVersion":"%s","internalVersion":"%s"}`, version.Version, version.InternalVersion)
|
||||
w := fmt.Sprintf("etcd %s", version.Version)
|
||||
if g := rw.Body.String(); g != w {
|
||||
t.Fatalf("body = %q, want %q", g, w)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
@ -29,10 +28,9 @@ const (
|
||||
)
|
||||
|
||||
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
|
||||
mh := &peerMembersHandler{
|
||||
clusterInfo: clusterInfo,
|
||||
timer: timer,
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
|
||||
|
||||
type peerMembersHandler struct {
|
||||
clusterInfo etcdserver.ClusterInfo
|
||||
timer etcdserver.RaftTimer
|
||||
}
|
||||
|
||||
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
|
||||
|
||||
if r.URL.Path != peerMembersPrefix {
|
||||
http.Error(w, "bad path", http.StatusBadRequest)
|
||||
|
@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("test data"))
|
||||
})
|
||||
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
|
||||
ph := NewPeerHandler(&fakeCluster{}, h)
|
||||
srv := httptest.NewServer(ph)
|
||||
defer srv.Close()
|
||||
|
||||
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
|
||||
id: 1,
|
||||
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
||||
}
|
||||
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
|
||||
h := &peerMembersHandler{clusterInfo: cluster}
|
||||
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
@ -145,18 +146,24 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
|
||||
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
||||
// Run the migrations.
|
||||
dataVer, err := version.DetectDataDir(cfg.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if walVersion == wal.WALUnknown {
|
||||
return nil, fmt.Errorf("unknown wal version in data dir %s", cfg.DataDir)
|
||||
if err := upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
haveWAL := walVersion != wal.WALNotExist
|
||||
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
var remotes []*Member
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
|
||||
@ -164,13 +171,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
cfg.Cluster.UpdateIndex(existingCluster.index)
|
||||
remotes = existingCluster.Members()
|
||||
cfg.Cluster.SetID(existingCluster.id)
|
||||
cfg.Cluster.SetStore(st)
|
||||
cfg.Print()
|
||||
id, n, s, w = startNode(cfg, nil)
|
||||
case !haveWAL && cfg.NewCluster:
|
||||
if err := cfg.VerifyBootstrapConfig(); err != nil {
|
||||
if err := cfg.VerifyBootstrap(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := cfg.Cluster.MemberByName(cfg.Name)
|
||||
@ -193,11 +200,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
cfg.PrintWithInitial()
|
||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
// Run the migrations.
|
||||
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
@ -237,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
Name: cfg.Name,
|
||||
ID: id.String(),
|
||||
}
|
||||
sstats.Initialize()
|
||||
lstats := stats.NewLeaderStats(id.String())
|
||||
|
||||
srv := &EtcdServer{
|
||||
@ -259,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||
}
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
||||
// add all the remote members into sendhub
|
||||
// add all remotes into transport
|
||||
for _, m := range remotes {
|
||||
if m.ID != id {
|
||||
tr.AddRemote(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
for _, m := range cfg.Cluster.Members() {
|
||||
if m.ID != id {
|
||||
tr.AddPeer(m.ID, m.PeerURLs)
|
||||
@ -291,7 +300,6 @@ func (s *EtcdServer) start() {
|
||||
s.w = wait.New()
|
||||
s.done = make(chan struct{})
|
||||
s.stop = make(chan struct{})
|
||||
s.stats.Initialize()
|
||||
// TODO: if this is an empty log, writes all peer infos
|
||||
// into the first entry
|
||||
go s.run()
|
||||
@ -394,19 +402,15 @@ func (s *EtcdServer) run() {
|
||||
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
||||
log.Panicf("recovery store error: %v", err)
|
||||
}
|
||||
s.Cluster.Recover()
|
||||
|
||||
// It avoids snapshot recovery overwriting newer cluster and
|
||||
// transport setting, which may block the communication.
|
||||
if s.Cluster.index < rd.Snapshot.Metadata.Index {
|
||||
s.Cluster.Recover()
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
|
||||
appliedi = rd.Snapshot.Metadata.Index
|
||||
@ -670,9 +674,9 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) send(ms []raftpb.Message) {
|
||||
for _, m := range ms {
|
||||
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
|
||||
m.To = 0
|
||||
for i, _ := range ms {
|
||||
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
|
||||
ms[i].To = 0
|
||||
}
|
||||
}
|
||||
s.r.transport.Send(ms)
|
||||
@ -722,7 +726,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
switch {
|
||||
case existsSet:
|
||||
if exists {
|
||||
return f(s.store.Update(r.Path, r.Val, expr))
|
||||
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||
return f(s.store.Update(r.Path, r.Val, expr))
|
||||
} else {
|
||||
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
|
||||
}
|
||||
}
|
||||
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
|
@ -235,20 +235,18 @@ func TestApplyRequest(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// PUT with PrevExist=true *and* PrevIndex set ==> Update
|
||||
// TODO(jonboulle): is this expected?!
|
||||
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
|
||||
{
|
||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
|
||||
Response{Event: &store.Event{}},
|
||||
[]testutil.Action{
|
||||
{
|
||||
Name: "Update",
|
||||
Params: []interface{}{"", "", time.Time{}},
|
||||
Name: "CompareAndSwap",
|
||||
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
|
||||
},
|
||||
},
|
||||
},
|
||||
// PUT with PrevExist=false *and* PrevIndex set ==> Create
|
||||
// TODO(jonboulle): is this expected?!
|
||||
{
|
||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
|
||||
Response{Event: &store.Event{}},
|
||||
@ -1391,6 +1389,7 @@ type nopTransporter struct{}
|
||||
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
)
|
||||
@ -93,9 +94,9 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
|
||||
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
|
||||
switch ver {
|
||||
case wal.WALv0_4:
|
||||
case version.DataDir0_4:
|
||||
log.Print("etcdserver: converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(baseDataDir, name)
|
||||
if err != nil {
|
||||
@ -103,16 +104,16 @@ func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0:
|
||||
case version.DataDir2_0:
|
||||
err := makeMemberDir(baseDataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0_1:
|
||||
case version.DataDir2_0_1:
|
||||
fallthrough
|
||||
default:
|
||||
log.Printf("datadir is valid for the 2.0.1 format")
|
||||
log.Printf("etcdserver: datadir is valid for the 2.0.1 format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
|
||||
clusterMustProgress(t, c.Members[:1])
|
||||
}
|
||||
|
||||
// Ensure we can remove a member then add a new one back immediately.
|
||||
func TestIssue2681(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
c := NewCluster(t, 5)
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||
c.waitLeader(t, c.Members)
|
||||
|
||||
c.AddMember(t)
|
||||
c.waitLeader(t, c.Members)
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
// Ensure we can remove a member after a snapshot then add a new one back.
|
||||
func TestIssue2746(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
c := NewCluster(t, 5)
|
||||
|
||||
for _, m := range c.Members {
|
||||
m.SnapCount = 10
|
||||
}
|
||||
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
// force a snapshot
|
||||
for i := 0; i < 20; i++ {
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||
c.waitLeader(t, c.Members)
|
||||
|
||||
c.AddMember(t)
|
||||
c.waitLeader(t, c.Members)
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||
// a random key first, and check the new key could be got from all client urls
|
||||
// of the cluster.
|
||||
@ -526,7 +566,7 @@ func (m *member) Launch() error {
|
||||
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
|
||||
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
@ -623,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
|
||||
}
|
||||
|
||||
func mustNewTransport(t *testing.T) *http.Transport {
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
21
main.go
21
main.go
@ -23,27 +23,8 @@
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/etcd/etcdmain"
|
||||
"github.com/coreos/etcd/migrate/starter"
|
||||
"github.com/coreos/etcd/pkg/coreos"
|
||||
)
|
||||
import "github.com/coreos/etcd/etcdmain"
|
||||
|
||||
func main() {
|
||||
if str := os.Getenv("ETCD_ALLOW_LEGACY_MODE"); str != "" {
|
||||
v, err := strconv.ParseBool(str)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to parse ETCD_ALLOW_LEGACY_MODE=%s as bool", str)
|
||||
}
|
||||
if v {
|
||||
starter.StartDesiredVersion(os.Args[1:])
|
||||
}
|
||||
} else if coreos.IsCoreOS() {
|
||||
starter.StartDesiredVersion(os.Args[1:])
|
||||
}
|
||||
etcdmain.Main()
|
||||
}
|
||||
|
@ -1,27 +0,0 @@
|
||||
|
||||
etcd migration functional tests
|
||||
=====
|
||||
|
||||
This functional test suite deploys a etcd cluster using processes, and asserts etcd is functioning properly.
|
||||
|
||||
Dependencies
|
||||
------------
|
||||
|
||||
The test suite can only be run in CoreOS system. It's recommended to run this in a virtual machine environment on CoreOS (e.g. using coreos-vagrant). The only dependency for the tests not provided on the CoreOS image is go.
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
Set environment variables point to the respective binaries that are used to drive the actual tests:
|
||||
|
||||
```
|
||||
$ export ETCD_V1_BIN=/path/to/v1_etcd
|
||||
$ export ETCD_V2_BIN=/path/to/v2_etcd
|
||||
$ export ETCDCTL_BIN=/path/to/etcdctl
|
||||
```
|
||||
|
||||
Then the tests can be run:
|
||||
|
||||
```
|
||||
$ go test github.com/coreos/etcd/migrate/functional
|
||||
```
|
@ -1,30 +0,0 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIFNDCCAx6gAwIBAgIBATALBgkqhkiG9w0BAQUwLTEMMAoGA1UEBhMDVVNBMRAw
|
||||
DgYDVQQKEwdldGNkLWNhMQswCQYDVQQLEwJDQTAeFw0xNDAzMTMwMjA5MDlaFw0y
|
||||
NDAzMTMwMjA5MDlaMC0xDDAKBgNVBAYTA1VTQTEQMA4GA1UEChMHZXRjZC1jYTEL
|
||||
MAkGA1UECxMCQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDdlBlw
|
||||
Jiakc4C1UpMUvQ+2fttyBMfMLivQgj51atpKd8qIBvpZwz1wtpzdRG0hSYMF0IUk
|
||||
MfBqyg+T5tt2Lfs3Gx3cYKS7G0HTfmABC7GdG8gNvEVNl/efxqvhis7p7hur765e
|
||||
J+N2GR4oOOP5Wa8O5flv10cp3ZJLhAguc2CONLzfh/iAYAItFgktGHXJ/AnUhhaj
|
||||
KWdKlK9Cv71YsRPOiB1hCV+LKfNSqrXPMvQ4sarz3yECIBhpV/KfskJoDyeNMaJd
|
||||
gabX/S7gUCd2FvuOpGWdSIsDwyJf0tnYmQX5XIQwBZJib/IFMmmoVNYc1bFtYvRH
|
||||
j0g0Ax4tHeXU/0mglqEcaTuMejnx8jlxZAM8Z94wHLfKbtaP0zFwMXkaM4nmfZqh
|
||||
vLZwowDGMv9M0VRFEhLGYIc3xQ8G2u8cFAGw1UqTxKhwAdRmrcFaQ38sk4kziy0u
|
||||
AkpGavS7PKcFjjB/fdDFO/kwGQOthX/oTn9nP3BT+IK2h1A6ATMPI4lVnhb5/KBt
|
||||
9M/fGgbiU+I9QT0Ilz/LlrcCuzyRXREvIZvoUL77Id+JT3qQxqPn/XMKLN4WEFII
|
||||
112MFGqCD85JZzNoC4RkZd8kFlR4YJWsS4WqJlWprESr5cCDuLviK+31cnIRF4fJ
|
||||
mz0gPsVgY7GFEan3JJnL8oRUVzdTPKfPt0atsQIDAQABo2MwYTAOBgNVHQ8BAf8E
|
||||
BAMCAAQwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUnVlVvktY+zlLpG43nTpG
|
||||
AWmUkrYwHwYDVR0jBBgwFoAUnVlVvktY+zlLpG43nTpGAWmUkrYwCwYJKoZIhvcN
|
||||
AQEFA4ICAQAqIcPFux3V4h1N0aGM4fCS/iT50TzDnRb5hwILKbmyA6LFnH4YF7PZ
|
||||
aA0utDNo1XSRDMpR38HWk0weh5Sfx6f2danaKZHAsea8oVEtdrz16ZMOvoh0CPIM
|
||||
/hn0CGQOoXDADDNFASuExhhpoyYkDqTVTCQ/zbhZg1mjBljJ+BBzlSgeoE4rUDpn
|
||||
nuDcmD9LtjpsVQL+J662rd51xV4Z6a7aZLvN9GfO8tYkfCGCD9+fGh1Cpz0IL7qw
|
||||
VRie+p/XpjoHemswnRhYJ4wn10a1UkVSR++wld6Gvjb9ikyr9xVyU5yrRM55pP2J
|
||||
VguhzjhTIDE1eDfIMMxv3Qj8+BdVQwtKFD+zQYQcbcjsvjTErlS7oCbM2DVlPnRT
|
||||
QaCM0q0yorfzc4hmml5P95ngz2xlohavgNMhsYIlcWyq3NVbm7mIXz2pjqa16Iit
|
||||
vL7WX6OVupv/EOMRx5cVcLqqEaYJmAlNd/CCD8ihDQCwoJ6DJhczPRexrVp+iZHK
|
||||
SnIUONdXb/g8ungXUGL1jGNQrWuq49clpI5sLWNjMDMFAQo0qu5bLkOIMlK/evCt
|
||||
gctOjXDvGXCk5h6Adf14q9zDGFdLoxw0/aciUSn9IekdzYPmkYUTifuzkVRsPKzS
|
||||
nmI4dQvz0rHIh4FBUKWWrJhRWhrv9ty/YFuJXVUHeAwr5nz6RFZ4wQ==
|
||||
-----END CERTIFICATE-----
|
@ -1,54 +0,0 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
Proc-Type: 4,ENCRYPTED
|
||||
DEK-Info: DES-EDE3-CBC,d2e70acc12a86116
|
||||
|
||||
uAmKZ41MiYTa7CappCFEcLL/kWRX4rE8DJG3sL59lv3j/6bYFkdczy3kgrEWm4Pn
|
||||
+pveJEssQkszXHkjA3vHx8nlTvfQOwa7ggcc76LNYj1sPHawVRNA0pb6WvjDzN7D
|
||||
JMgAnptVuZGP8N6ZIzFvr5Rf58ar5Y2aI7Ti6KxLZvqYojgvz5dzGimC3+SwDlFy
|
||||
Q2kwBA/HT4X9w2qSxpQ7WGPw2pkYILZ4Nxfqh9PWHd0Pk1d9KoLhbU5LEtGSy/y9
|
||||
9jqKsUqBzp9905t7d2KmFDF9Nd7XvHrDZDPILlKcQYnBxg6c1ChH1NkIqdAW7lQ6
|
||||
dAKAFZlMpVb/ArFBjhioljBIO+gLcWxYseHXbteOgbC1cw5xcBTHqH+7RotFH1VO
|
||||
ya0DFeW2CyPj4mp7vORD+IOVQaG4H5j1vJXqA9OPBziZR+lHvD0gVJqZIquXIQlW
|
||||
MBpX5CfV/3xITb6o0wA2OG2qlNM+VbKzg/cqh/kkusAqcfXIByh16K85k4jwPrBG
|
||||
wsYWABgw1vLlrCJ7ug6P2rb6VmzTbMqe4gpqUROgCS36ARjs5eDBDYZsX6NaGSh6
|
||||
twAUfzpwoGNuHwUpIYf5BjH1me+tnM0S8tAEtCFf9hy88nCg6v22cWQuAD6+6P6B
|
||||
Skl/UYT4sxeeETFv7Vf70wLnBMA3/uymBM75FhPyD5Vvg9fxz7aAJbfB2ovUVZ/v
|
||||
l3HCsCo8y7DtEXoiBmPCH28JWVhIZbmP3dYnU8c86SubhNWm0yjJIIwoghyFmCcO
|
||||
Wjs0XkVUUa9fGrl6Mc6XQIGsS6UdQkFoIcO+dtIFPg5C5GWnPnF53ro0J4pGcyR0
|
||||
zgt9ubCcFKNz5Cbcfw7fKJwswMt6zXtFxE/tVvOq2EPAPrmYYwPrnvbSNbuVL+as
|
||||
OT5ukITR9MDsYR/19jFUsdRDjSvUQVwqH7PiKwTnZouuJUhYHfj3Bjhz6cWzadcd
|
||||
pNdxqSgEeSzvaz390p1dOpN/0d1ItXlp3za6JZUarVkx8yH9UCFfpEEisPYgTASf
|
||||
F2xIrWHgZY+87OjPluU+Gym12ldcs0dbySgsxhKZMyAUd0DB2Knnmug+cqVvN+xo
|
||||
rJ2pD7J08zmQSRGyAUsbeUnuGb6fGNxaD5QpEN7nK4x3K1Q5N9QQ3RwL4Ik6jV0N
|
||||
eO0LzXF/BZbOAvl/OXAse1f5c7FO21oUw6u6iI0xvTJAcnaH/0eE2N6Y9Lwt507K
|
||||
HxhuN5j58/sOeb6kfkX563SoKSdYSrBqIaogDZFCtKpEBevsRM+QRdzAc//Fm67U
|
||||
Zs2K/ADM8+IaQN7uhm8IAPtWEnJ5+9rM2PCF0NX+7qa9HtZxTd0cqbeL8Ayx4i/T
|
||||
dHvN8k3kPuC+6He7+eZR6EQpN5GPt5SX3QGgKOQbbwBgF8mS/R0zaZpHvaqTY4Bi
|
||||
RfsLbRBGoTvR8YjqaQW91tExe5FghH7k02slSGzEzgs/ZhqPMCLNC7uFcSKcx9jA
|
||||
Bj+GmrYOMrUOYLQPT1iRtBFjLEUGPlvUGlaJS/JcvBN6DPW375tQHk7kbpVcudPh
|
||||
6vVXftuDiYEJk1TIQLt3QdC9s6ieVuAds4KDjYaTZz4s5W2Lkwo5AZzwLeMRank1
|
||||
96okoO1qRaDgagHsG8yPIwq+8/b/8dNl7E+wsbAWwLXLhYZGqDmHm/16pv/Ck59W
|
||||
LXLoJfrOdKBoxTTZulIsTISZ14Bj87QWPW26kI6So9V5vN60rb2MWrd+HU46Qapi
|
||||
JCsfCVsi715GUh4IkqAnec26TuXW2THcOp3p19SyubuJ33XqUR9H7BOZuBsIFeZV
|
||||
8sihbgjJ/zb7fZ7AGT3VmAxEtgFi8u2NOBN/WqYb++khtXgnIbOhBx9PuhOBofrO
|
||||
4M0R5s6F2SpbX2LEBJFN48wIlRmSMTsKdmZmA7f0IuxjYIcotBdRCGoXRlJJnZeH
|
||||
7WriXQJsq0517GlrqgYMDx26xHJy/ao+zcDxsCtftzAQvENuGr1lzsCdIcGXs+FU
|
||||
7C8qdmqSXgZgltFQpyR7+PMikXcdYdzkT3BjFh+VKJNiAeGXNnVXQH7L/V49zaij
|
||||
BRYWWtHwEDz50vSzZz3fnrFl6Pk8tny4bKoLjB4vBjMlb4yte7LcK+vbfDdreISD
|
||||
cDqfpzjAmIpv1GoQFKWGLQjagvwiAfOA8GUivEG9SQSAAImkV9qkr5qYzM7Jn2WU
|
||||
icA8D0YfuILpGxTOQc1SgDMOiGboCB+f7cxPsjXHbVahNyxxAbDbTjbc6v7q1oiy
|
||||
PESoLaBR0Bi0tdKivvbB63ok2Kq9XneFrQeCIyrhkXIvYDEwdcoCBpL1DEotbU+D
|
||||
YjZTLr4UW92xi1M4d94zmG6pyJsfC4sHGflY5paml9dLiEy78rCPfrJkrSSUplf+
|
||||
8CjfUoZsbq3haE0N4TbqV0I0W2Fm/a6U113CTRYxj9DeA3m/HFU3TLzk9Vg/vGxP
|
||||
/xltsu/wd/GoyoD9OhWhW1Ck9dtQ0G64hQjeXVd/pzsDCMT8hrtKSlX1Q7vK96ml
|
||||
OJ9Ju/CdhX2lJA8BrGVh4HS1fsuNFjr5KqZAY6MwFpjAPqvqD7WFE3Yflk5/7VtX
|
||||
bsvBZoN2vp9hprXsgm8/KmSNnWxzQY1Nps4XjRJVYeTmND5EyQClGJyCYKg0QVDo
|
||||
7L/2GAhnOrSLkAHOcYAlrNhZ85yBiLhjJcvWyT6DDcMpCusgictI2Qv2ZjMmz46v
|
||||
62PzHm0/Z3yQMcJnpRO79OdodbY22Eg9xZGGhBp1Xbm/OXYLaEpGW9S7DqPvlD5v
|
||||
O+VxENxJNwDELK9H2auGJAQdORwgF0VfvZxN6tGRyb7eI6aJj04YYMBkg5Nds+AR
|
||||
sNEdGNzqKm8sWvINSoX+BCOyjElOSRW0glK+ala5Y7/mM3+KOWgMas2LZBcLZfBr
|
||||
1/Z0DPIA2CkFtT1VsBKa+fSkEN0s+PRLRV/QWrcMbkSvIaKcswMwoyvI6OcddUEz
|
||||
YgjAOZ3TdnRm1DMqZHIsPOj+3xQv6nETqSwhvLJT1wJwnJQVbxjZwoUmJKSsZDEB
|
||||
2xL9OWlhFNY2qS7F77vv2ZUJYLYniiTGrC09AAQ4ti8zWnY1gqtaCp+1wynt/Abs
|
||||
9gGcbEIaQGWhpVjPtlKjNm86jGP0IXPaAgaOViIuBH+0GeVOLuUMLvb0nL0NWMJa
|
||||
-----END RSA PRIVATE KEY-----
|
@ -1,31 +0,0 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIFWzCCA0WgAwIBAgIBAjALBgkqhkiG9w0BAQUwLTEMMAoGA1UEBhMDVVNBMRAw
|
||||
DgYDVQQKEwdldGNkLWNhMQswCQYDVQQLEwJDQTAeFw0xNDAzMTMwMjA5MjJaFw0y
|
||||
NDAzMTMwMjA5MjJaMEUxDDAKBgNVBAYTA1VTQTEQMA4GA1UEChMHZXRjZC1jYTEP
|
||||
MA0GA1UECxMGc2VydmVyMRIwEAYDVQQDEwkxMjcuMC4wLjEwggIiMA0GCSqGSIb3
|
||||
DQEBAQUAA4ICDwAwggIKAoICAQDI3EvqJrLWsnPbjAT8ENiMRyBINhhafubi5Nb+
|
||||
glEzkbC2kv2zXkVkpkBubDRwyh3eomSbdwKYk3yz+IopT753teJueRpMPq9Ayr/+
|
||||
PZl4Y1tG04KcjfOvOls6zPsDfHzluR8TE705If5wwZu3Bdwxzdtx9T0ROzIEgRt0
|
||||
Axuce5qkg93IWNxOrIr+4LCxYfTpvpTXO20lz0IuQNm1Opo9PVoWn7PXdOmuCzSG
|
||||
2hW1DcKqSyQP7IkplBJS0EhoovIsXavSkPKJssvQj73ZFIBVgKhXuHmPNdrypaQk
|
||||
CtxsqbVdOOlojItqYTTDAiadwRQWkYgDOSQCGJiPqYVJx+rH4MlzxQ6n9x2qIcne
|
||||
lfMr+VFDEc1YvHu1XLMg5b1ImD6ChutYW0RhFJ3CQVdQR2i4kJ8T1DSJYLISMODZ
|
||||
ux1cZaUoSL/EkrC5/8POWZmP8nJXO6A4wrZDHF30/qWpo+T5PvsA6cABfX1jkcTx
|
||||
PBXGK1qOZ8rToTxprJ2zc3zuZNxSgM32nzjcPUgn559Mgdl0HR4c4JeTZGsebWmx
|
||||
MWmkz//BV4eUaGHqCpzRQHf3YIxysvDC2Xf4z2Alk8AlLRXp7/ksatdxAtyc+y8+
|
||||
MWCc6N0YbI9zjv+ezCBqR+mu1P5Tb0HebPFz3dOdIpiC3kU8QyMEagw8u5xliZs4
|
||||
AxwdNwIDAQABo3IwcDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwHQYD
|
||||
VR0OBBYEFD6UrVN8uolWz6et79jVeZetjd4XMB8GA1UdIwQYMBaAFJ1ZVb5LWPs5
|
||||
S6RuN506RgFplJK2MA8GA1UdEQQIMAaHBH8AAAEwCwYJKoZIhvcNAQEFA4ICAQCo
|
||||
sKn1Rjx0tIVWAZAZB4lCWvkQDp/txnb5zzQUlKhIW2o98IklASmOYYyZbE2PXlda
|
||||
/n8TwKIzWgIoNh5AcgLWhtASrnZdGFXY88n5jGk6CVZ1+Dl+IX99h+r+YHQzf1jU
|
||||
BjGrZHGv3pPjwhFGDS99lM/TEBk/eLI2Kx5laL+nWMTwa8M1OwSIh6ZxYPVlWUqb
|
||||
rurk5l/YqW+UkYIXIQhe6LwtB7tBjr6nDIWBfHQ7uN8IdB8VIAF6lejr22VmERTW
|
||||
j+zJ5eTzuQN1f0s930mEm8pW7KgGxlEqrUlSJtxlMFCv6ZHZk1Y4yEiOCBKlPNme
|
||||
X3B+lhj//PH3gLNm3+ZRr5ena3k+wL9Dd3d3GDCIx0ERQyrGS/rJpqNPI+8ZQlG0
|
||||
nrFlm7aP6UznESQnJoSFbydiD0EZ4hXSdmDdXQkTklRpeXfMcrYBGN7JrGZOZ2T2
|
||||
WtXBMx2bgPeEH50KRrwUMFe122bchh0Fr+hGvNK2Q9/gRyQPiYHq6vSF4GzorzLb
|
||||
aDuWA9JRH8/c0z8tMvJ7KjmmmIxd39WWGZqiBrGQR7utOJjpQl+HCsDIQM6yZ/Bu
|
||||
RpwKj2yBz0OQg4tWbtqUuFkRMTkCR6vo3PadgO1VWokM7UFUXlScnYswcM5EwnzJ
|
||||
/IsYJ2s1V706QVUzAGIbi3+wYi3enk7JfYoGIqa2oA==
|
||||
-----END CERTIFICATE-----
|
@ -1,51 +0,0 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIJKAIBAAKCAgEAyNxL6iay1rJz24wE/BDYjEcgSDYYWn7m4uTW/oJRM5GwtpL9
|
||||
s15FZKZAbmw0cMod3qJkm3cCmJN8s/iKKU++d7XibnkaTD6vQMq//j2ZeGNbRtOC
|
||||
nI3zrzpbOsz7A3x85bkfExO9OSH+cMGbtwXcMc3bcfU9ETsyBIEbdAMbnHuapIPd
|
||||
yFjcTqyK/uCwsWH06b6U1zttJc9CLkDZtTqaPT1aFp+z13Tprgs0htoVtQ3Cqksk
|
||||
D+yJKZQSUtBIaKLyLF2r0pDyibLL0I+92RSAVYCoV7h5jzXa8qWkJArcbKm1XTjp
|
||||
aIyLamE0wwImncEUFpGIAzkkAhiYj6mFScfqx+DJc8UOp/cdqiHJ3pXzK/lRQxHN
|
||||
WLx7tVyzIOW9SJg+gobrWFtEYRSdwkFXUEdouJCfE9Q0iWCyEjDg2bsdXGWlKEi/
|
||||
xJKwuf/DzlmZj/JyVzugOMK2Qxxd9P6lqaPk+T77AOnAAX19Y5HE8TwVxitajmfK
|
||||
06E8aayds3N87mTcUoDN9p843D1IJ+efTIHZdB0eHOCXk2RrHm1psTFppM//wVeH
|
||||
lGhh6gqc0UB392CMcrLwwtl3+M9gJZPAJS0V6e/5LGrXcQLcnPsvPjFgnOjdGGyP
|
||||
c47/nswgakfprtT+U29B3mzxc93TnSKYgt5FPEMjBGoMPLucZYmbOAMcHTcCAwEA
|
||||
AQKCAgBS1vCESKOXgo/f61ae8v+skyUQQyc2I4Jr739wBiUhRKQCGIuDr4ylHyAR
|
||||
qpTSM7mv+X/O0n2CmcljnEy3Dwl568zQTSf4bB3xde1LGPKzwR6DDnaexLjM+x9n
|
||||
F+UqoewM/pV/U7PF3WxH6sGi8UrIS6OG02L1OVm+m9TLuwBnQF8eHLiaiXOLCwRk
|
||||
bBzTe5f70zslrX+tiVY9J0fiw6GbQjNmg0UzxicePcbTGxy6yEsR2t2rp51GRahs
|
||||
+TPz28hPXe6gcGFnQxNmF/JvllH7cY18aDvSQZ7kVkZlCwmv0ypWoUM6eESDgkW1
|
||||
a6yrgVccm7bhxW5BYw2AqqSrMkV0oMcCUjh2rYvex7w6dM374Ok3DD/dXjTHLNV5
|
||||
+0tHMxXUiCKwe7hVEg+iGD4E1jap5n5c4RzpEtAXsGEK5WUBksHi9qOBv+lubjZn
|
||||
Kcfbos+BbnmUCU3MmU48EZwyFQIu9djkLXfJV2Cbbg9HmkrIOYgi4tFjoBKeQLE4
|
||||
6GCucMWnNfMO7Kq/z7c+7sfWOAA55pu0Ojel8VH6US+Y/1mEuSUhQudrJn8GxAmc
|
||||
4t+C2Ie1Q1bK3iJbd0NUqtlwd9xI9wQgCbaxfQceUmBBjuTUu3YFctZ7Jia7h18I
|
||||
gZ3wsKfySDhW29XTFvnT3FUpc+AN9Pv4sB7uobm6qOBV8/AdKQKCAQEA1zwIuJki
|
||||
bSgXxsD4cfKgQsyIk0eMj8bDOlf/A8AFursXliH3rRASoixXNgzWrMhaEIE2BeeT
|
||||
InE13YCUjNCKoz8oZJqKYpjh3o/diZf1vCo6m/YUSR+4amynWE4FEAa58Og2WCJ3
|
||||
Nx8/IMpmch2VZ+hSQuNr5uvpH84+eZADQ1GB6ypzqxb5HjIEeryLJecDQGe4ophd
|
||||
JCo3loezq/K0XJQI8GTBe2GQPjXSmLMZKksyZoWEXAaC1Q+sdJWZvBpm3GfVQbXu
|
||||
q7wyqTMknVIlEOy0sHxstsbayysSFFQ/fcgKjyQb8f4efOkyQg8mH5vQOZghbHJ+
|
||||
7I8wVSSBt+bE2wKCAQEA7udRoo2NIoIpJH+2+SPqJJVq1gw/FHMM4oXNZp+AAjR1
|
||||
hTWcIzIXleMyDATl5ZFzZIY1U2JMifS5u2R7fDZEu9vfZk4e6BJUJn+5/ahjYFU8
|
||||
m8WV4rFWR6XN0SZxPb43Mn6OO7EoMqr8InRufiN4LwIqnPqDm2D9Fdijb9QFJ2UG
|
||||
QLKNnIkLTcUfx1RYP4T48CHkeZdxV8Cp49SzSSV8PbhIVBx32bm/yO6nLHoro7Wl
|
||||
YqXGW0wItf2BUA5a5eYNO0ezVkOkTp2aj/p9i+0rqbsYa480hzlnOzYI5F72Z8V2
|
||||
iPltUAeQn53Vg1azySa1x8/0Xp5nVsgQSh18CH3p1QKCAQBxZv4pVPXgkXlFjTLZ
|
||||
xr5Ns7pZ7x7OOiluuiJw9WGPazgYMDlxA8DtlXM11Tneu4lInOu73LGXOhLpa+/Y
|
||||
6Z/CN2qu5wX2wRpwy1gsQNaGl7FdryAtDvt5h1n8ms7sDL83gQHxGee6MUpvmnSz
|
||||
t4aawrtk5rJZbv7bdS1Rm2E8vNs47psXD/mdwTi++kxOYhNCgeO0N5cLkPrM4x71
|
||||
f+ErzguPrWaL/XGkdXNKZULjF8+sWLjOS9fvLlzs6E2h4D9F7addAeCIt5XxtDKc
|
||||
eUVyT2U8f7I/8zIgTccu0tzJBvcZSCs5K20g3zVNvPGXQd9KGS+zFfht51vN4HhA
|
||||
TuR1AoIBAGuQBKZeexP1bJa9VeF4dRxBldeHrgMEBeIbgi5ZU+YqPltaltEV5Z6b
|
||||
q1XUArpIsZ6p+mpvkKxwXgtsI1j6ihnW1g+Wzr2IOxEWYuQ9I3klB2PPIzvswj8B
|
||||
/NfVKhk1gl6esmVXzxR4/Yp5x6HNUHhBznPdKtITaf+jCXr5B9UD3DvW6IF5Bnje
|
||||
bv9tD0qSEQ71A4xnTiXHXfZxNsOROA4F4bLVGnUR97J9GRGic/GCgFMY9mT2p9lg
|
||||
qQ8lV3G5EW4GS01kqR6oQQXgLxSIFSeXUFhlIq5bfwoeuwQvaVuxgTwMqVXmAgyL
|
||||
oK1ApTPE1QWAsLLFORvOed8UxVqBbn0CggEBALfr/wheXCKLdzFzm03sO1i9qVz2
|
||||
vnpxzexXW3V/TtM6Dff2ojgkDC+CVximtAiLA/Wj60hXnQxw53g5VVT5rESx0J3c
|
||||
pq+azbi1eWzFeOrqJvKQhMfYc0nli7YuGnPkKzeepJJtWZHYkAjL4QZAn1jt0RqV
|
||||
DQmlGPGiOuGP8uh59c23pbjgh4eSJnvhOT2BFKhKZpBdTBYeiQiZBqIyme8rNTFr
|
||||
NmpBxtUr77tccVTrcWWhhViG36UNpetAP7b5QCHScIXZJXrEqyK5HaePqi5UMH8o
|
||||
alSz6s2REG/xP7x54574TvRG/3cIamv1AfZAOjin7BwhlSLhPl2eeh4Cgas=
|
||||
-----END RSA PRIVATE KEY-----
|
@ -1,278 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package functional
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var binDir = ".versions"
|
||||
|
||||
type Proc struct {
|
||||
*exec.Cmd
|
||||
Name string
|
||||
DataDir string
|
||||
URL string
|
||||
PeerURL string
|
||||
|
||||
stderr io.ReadCloser
|
||||
}
|
||||
|
||||
func NewProcWithDefaultFlags(path string) *Proc {
|
||||
var args []string
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd")
|
||||
if err != nil {
|
||||
fmt.Printf("unexpected TempDir error: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
args = append(args, "--data-dir="+dir)
|
||||
args = append(args, "--name=default")
|
||||
p := &Proc{
|
||||
Cmd: exec.Command(path, args...),
|
||||
Name: "default",
|
||||
DataDir: dir,
|
||||
URL: "http://127.0.0.1:4001",
|
||||
PeerURL: "http://127.0.0.1:7001",
|
||||
}
|
||||
// always expect to use start_desired_verson mode
|
||||
p.Env = append(p.Env,
|
||||
"ETCD_BINARY_DIR="+binDir,
|
||||
)
|
||||
return p
|
||||
}
|
||||
|
||||
func NewProcWithV1Flags(path string) *Proc {
|
||||
p := NewProcWithDefaultFlags(path)
|
||||
p.SetV1PeerAddr("127.0.0.1:7001")
|
||||
return p
|
||||
}
|
||||
|
||||
func NewProcWithV2Flags(path string) *Proc {
|
||||
p := NewProcWithDefaultFlags(path)
|
||||
p.SetV2PeerURL("http://127.0.0.1:7001")
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Proc) SetV2PeerURL(url string) {
|
||||
p.Args = append(p.Args,
|
||||
"-listen-peer-urls="+url,
|
||||
"-initial-advertise-peer-urls="+url,
|
||||
"-initial-cluster",
|
||||
p.Name+"="+url,
|
||||
)
|
||||
p.PeerURL = url
|
||||
}
|
||||
|
||||
func (p *Proc) SetV1PeerAddr(addr string) {
|
||||
p.Args = append(p.Args,
|
||||
"-peer-addr="+addr,
|
||||
)
|
||||
p.PeerURL = "http://" + addr
|
||||
}
|
||||
|
||||
func (p *Proc) SetV1Addr(addr string) {
|
||||
p.Args = append(p.Args,
|
||||
"-addr="+addr,
|
||||
)
|
||||
p.URL = "http://" + addr
|
||||
}
|
||||
|
||||
func (p *Proc) SetV1Peers(peers []string) {
|
||||
p.Args = append(p.Args,
|
||||
"-peers="+strings.Join(peers, ","),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *Proc) SetName(name string) {
|
||||
p.Args = append(p.Args,
|
||||
"-name="+name,
|
||||
)
|
||||
p.Name = name
|
||||
}
|
||||
|
||||
func (p *Proc) SetDataDir(dataDir string) {
|
||||
p.Args = append(p.Args,
|
||||
"-data-dir="+dataDir,
|
||||
)
|
||||
p.DataDir = dataDir
|
||||
}
|
||||
|
||||
func (p *Proc) SetSnapCount(cnt int) {
|
||||
p.Args = append(p.Args,
|
||||
"-snapshot-count="+strconv.Itoa(cnt),
|
||||
)
|
||||
}
|
||||
|
||||
func (p *Proc) SetDiscovery(url string) {
|
||||
p.Args = append(p.Args,
|
||||
"-discovery="+url,
|
||||
)
|
||||
}
|
||||
|
||||
func (p *Proc) SetPeerTLS(certFile, keyFile, caFile string) {
|
||||
p.Args = append(p.Args,
|
||||
"-peer-cert-file="+certFile,
|
||||
"-peer-key-file="+keyFile,
|
||||
"-peer-ca-file="+caFile,
|
||||
)
|
||||
u, err := url.Parse(p.PeerURL)
|
||||
if err != nil {
|
||||
log.Panicf("unexpected parse error: %v", err)
|
||||
}
|
||||
u.Scheme = "https"
|
||||
p.PeerURL = u.String()
|
||||
}
|
||||
|
||||
func (p *Proc) CleanUnsuppportedV1Flags() {
|
||||
var args []string
|
||||
for _, arg := range p.Args {
|
||||
if !strings.HasPrefix(arg, "-peers=") {
|
||||
args = append(args, arg)
|
||||
}
|
||||
}
|
||||
p.Args = args
|
||||
}
|
||||
|
||||
func (p *Proc) Start() error {
|
||||
if err := p.Cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
for k := 0; k < 50; k++ {
|
||||
_, err := http.Get(p.URL)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return fmt.Errorf("instance %s failed to be available after a long time", p.Name)
|
||||
}
|
||||
|
||||
func (p *Proc) Stop() {
|
||||
if err := p.Cmd.Process.Kill(); err != nil {
|
||||
fmt.Printf("Process Kill error: %v", err)
|
||||
return
|
||||
}
|
||||
p.Cmd.Wait()
|
||||
}
|
||||
|
||||
func (p *Proc) Restart() error {
|
||||
p.Stop()
|
||||
return p.Start()
|
||||
}
|
||||
|
||||
func (p *Proc) Terminate() {
|
||||
p.Stop()
|
||||
os.RemoveAll(p.DataDir)
|
||||
}
|
||||
|
||||
type ProcGroup []*Proc
|
||||
|
||||
func NewProcInProcGroupWithV1Flags(path string, num int, idx int) *Proc {
|
||||
pg := NewProcGroupWithV1Flags(path, num)
|
||||
return pg[idx]
|
||||
}
|
||||
|
||||
func NewProcGroupWithV1Flags(path string, num int) ProcGroup {
|
||||
pg := make([]*Proc, num)
|
||||
for i := 0; i < num; i++ {
|
||||
pg[i] = NewProcWithDefaultFlags(path)
|
||||
pg[i].SetName(fmt.Sprintf("etcd%d", i))
|
||||
pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i))
|
||||
pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i))
|
||||
if i > 0 {
|
||||
pg[i].SetV1Peers([]string{"127.0.0.1:7001"})
|
||||
}
|
||||
}
|
||||
return pg
|
||||
}
|
||||
|
||||
func NewProcGroupViaDiscoveryWithV1Flags(path string, num int, url string) ProcGroup {
|
||||
pg := make([]*Proc, num)
|
||||
for i := range pg {
|
||||
pg[i] = NewProcWithDefaultFlags(path)
|
||||
pg[i].SetName(fmt.Sprintf("etcd%d", i))
|
||||
pg[i].SetDiscovery(url)
|
||||
pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i))
|
||||
pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i))
|
||||
}
|
||||
return pg
|
||||
}
|
||||
|
||||
func (pg ProcGroup) SetPeerTLS(certFile, keyFile, caFile string) {
|
||||
for i := range pg {
|
||||
pg[i].SetPeerTLS(certFile, keyFile, caFile)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg ProcGroup) InheritDataDir(opg ProcGroup) {
|
||||
for i := range pg {
|
||||
pg[i].SetDataDir(opg[i].DataDir)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg ProcGroup) SetSnapCount(count int) {
|
||||
for i := range pg {
|
||||
pg[i].SetSnapCount(count)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg ProcGroup) CleanUnsuppportedV1Flags() {
|
||||
for _, p := range pg {
|
||||
p.CleanUnsuppportedV1Flags()
|
||||
}
|
||||
}
|
||||
|
||||
func (pg ProcGroup) Start() error {
|
||||
for _, p := range pg {
|
||||
if err := p.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// leave time for instances to sync and write some entries into disk
|
||||
// TODO: use more reliable method
|
||||
time.Sleep(time.Second)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pg ProcGroup) Wait() error {
|
||||
for _, p := range pg {
|
||||
if err := p.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pg ProcGroup) Stop() {
|
||||
for _, p := range pg {
|
||||
p.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (pg ProcGroup) Terminate() {
|
||||
for _, p := range pg {
|
||||
p.Terminate()
|
||||
}
|
||||
}
|
@ -1,414 +0,0 @@
|
||||
package functional
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
v1BinPath = path.Join(binDir, "1")
|
||||
v2BinPath = path.Join(binDir, "2")
|
||||
etcdctlBinPath string
|
||||
)
|
||||
|
||||
func init() {
|
||||
os.RemoveAll(binDir)
|
||||
if err := os.Mkdir(binDir, 0700); err != nil {
|
||||
fmt.Printf("unexpected Mkdir error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := os.Symlink(absPathFromEnv("ETCD_V1_BIN"), v1BinPath); err != nil {
|
||||
fmt.Printf("unexpected Symlink error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := os.Symlink(absPathFromEnv("ETCD_V2_BIN"), v2BinPath); err != nil {
|
||||
fmt.Printf("unexpected Symlink error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
etcdctlBinPath = os.Getenv("ETCDCTL_BIN")
|
||||
|
||||
mustExist(v1BinPath)
|
||||
mustExist(v2BinPath)
|
||||
mustExist(etcdctlBinPath)
|
||||
}
|
||||
|
||||
func TestStartNewMember(t *testing.T) {
|
||||
tests := []*Proc{
|
||||
NewProcWithDefaultFlags(v2BinPath),
|
||||
NewProcWithV1Flags(v2BinPath),
|
||||
NewProcWithV2Flags(v2BinPath),
|
||||
}
|
||||
for i, tt := range tests {
|
||||
if err := tt.Start(); err != nil {
|
||||
t.Fatalf("#%d: Start error: %v", i, err)
|
||||
}
|
||||
defer tt.Terminate()
|
||||
|
||||
ver, err := checkInternalVersion(tt.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: checkVersion error: %v", i, err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("#%d: internal version = %s, want %s", i, ver, "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartV2Member(t *testing.T) {
|
||||
tests := []*Proc{
|
||||
NewProcWithDefaultFlags(v2BinPath),
|
||||
NewProcWithV1Flags(v2BinPath),
|
||||
NewProcWithV2Flags(v2BinPath),
|
||||
}
|
||||
for i, tt := range tests {
|
||||
// get v2 data dir
|
||||
p := NewProcWithDefaultFlags(v2BinPath)
|
||||
if err := p.Start(); err != nil {
|
||||
t.Fatalf("#%d: Start error: %v", i, err)
|
||||
}
|
||||
p.Stop()
|
||||
tt.SetDataDir(p.DataDir)
|
||||
if err := tt.Start(); err != nil {
|
||||
t.Fatalf("#%d: Start error: %v", i, err)
|
||||
}
|
||||
defer tt.Terminate()
|
||||
|
||||
ver, err := checkInternalVersion(tt.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: checkVersion error: %v", i, err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("#%d: internal version = %s, want %s", i, ver, "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartV1Member(t *testing.T) {
|
||||
tests := []*Proc{
|
||||
NewProcWithDefaultFlags(v2BinPath),
|
||||
NewProcWithV1Flags(v2BinPath),
|
||||
NewProcWithV2Flags(v2BinPath),
|
||||
}
|
||||
for i, tt := range tests {
|
||||
// get v1 data dir
|
||||
p := NewProcWithDefaultFlags(v1BinPath)
|
||||
if err := p.Start(); err != nil {
|
||||
t.Fatalf("#%d: Start error: %v", i, err)
|
||||
}
|
||||
p.Stop()
|
||||
tt.SetDataDir(p.DataDir)
|
||||
if err := tt.Start(); err != nil {
|
||||
t.Fatalf("#%d: Start error: %v", i, err)
|
||||
}
|
||||
defer tt.Terminate()
|
||||
|
||||
ver, err := checkInternalVersion(tt.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: checkVersion error: %v", i, err)
|
||||
}
|
||||
if ver != "1" {
|
||||
t.Errorf("#%d: internal version = %s, want %s", i, ver, "1")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpgradeV1Cluster(t *testing.T) {
|
||||
// get v2-desired v1 data dir
|
||||
pg := NewProcGroupWithV1Flags(v1BinPath, 3)
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
t.Logf("wait until etcd exits...")
|
||||
if err := pg.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
|
||||
npg := NewProcGroupWithV1Flags(v2BinPath, 3)
|
||||
npg.InheritDataDir(pg)
|
||||
npg.CleanUnsuppportedV1Flags()
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
|
||||
for _, p := range npg {
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpgradeV1SnapshotedCluster(t *testing.T) {
|
||||
// get v2-desired v1 data dir
|
||||
pg := NewProcGroupWithV1Flags(v1BinPath, 3)
|
||||
pg.SetSnapCount(10)
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
t.Logf("wait until etcd exits...")
|
||||
if err := pg.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
for _, p := range pg {
|
||||
// check it has taken snapshot
|
||||
fis, err := ioutil.ReadDir(path.Join(p.DataDir, "snapshot"))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected ReadDir error: %v", err)
|
||||
}
|
||||
if len(fis) == 0 {
|
||||
t.Fatalf("unexpected no-snapshot data dir")
|
||||
}
|
||||
}
|
||||
|
||||
npg := NewProcGroupWithV1Flags(v2BinPath, 3)
|
||||
npg.InheritDataDir(pg)
|
||||
npg.CleanUnsuppportedV1Flags()
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
|
||||
for _, p := range npg {
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJoinV1Cluster(t *testing.T) {
|
||||
pg := NewProcGroupWithV1Flags(v1BinPath, 1)
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
pg.Stop()
|
||||
npg := NewProcGroupWithV1Flags(v2BinPath, 3)
|
||||
npg[0].SetDataDir(pg[0].DataDir)
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
|
||||
for _, p := range npg {
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "1" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "1")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJoinV1ClusterViaDiscovery(t *testing.T) {
|
||||
dp := NewProcWithDefaultFlags(v1BinPath)
|
||||
dp.SetV1Addr("127.0.0.1:5001")
|
||||
dp.SetV1PeerAddr("127.0.0.1:8001")
|
||||
if err := dp.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer dp.Terminate()
|
||||
|
||||
durl := "http://127.0.0.1:5001/v2/keys/cluster/"
|
||||
pg := NewProcGroupViaDiscoveryWithV1Flags(v1BinPath, 1, durl)
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
pg.Stop()
|
||||
npg := NewProcGroupViaDiscoveryWithV1Flags(v2BinPath, 3, durl)
|
||||
npg[0].SetDataDir(pg[0].DataDir)
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
|
||||
for _, p := range npg {
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "1" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "1")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpgradeV1Standby(t *testing.T) {
|
||||
// get v1 standby data dir
|
||||
pg := NewProcGroupWithV1Flags(v1BinPath, 3)
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
req, err := http.NewRequest("PUT", pg[0].PeerURL+"/v2/admin/config", bytes.NewBufferString(`{"activeSize":3,"removeDelay":1800,"syncInterval":5}`))
|
||||
if err != nil {
|
||||
t.Fatalf("NewRequest error: %v", err)
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("http Do error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK)
|
||||
}
|
||||
|
||||
p := NewProcInProcGroupWithV1Flags(v2BinPath, 4, 3)
|
||||
if err := p.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
fmt.Println("checking new member is in standby mode...")
|
||||
mustExist(path.Join(p.DataDir, "standby_info"))
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "1" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "1")
|
||||
}
|
||||
|
||||
fmt.Println("upgrading the whole cluster...")
|
||||
cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[0].PeerURL)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
fmt.Println("waiting until peer-mode etcd exits...")
|
||||
if err := pg.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
fmt.Println("restarting the peer-mode etcd...")
|
||||
npg := NewProcGroupWithV1Flags(v2BinPath, 3)
|
||||
npg.InheritDataDir(pg)
|
||||
npg.CleanUnsuppportedV1Flags()
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
fmt.Println("waiting until standby-mode etcd exits...")
|
||||
if err := p.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
fmt.Println("restarting the standby-mode etcd...")
|
||||
np := NewProcInProcGroupWithV1Flags(v2BinPath, 4, 3)
|
||||
np.SetDataDir(p.DataDir)
|
||||
np.CleanUnsuppportedV1Flags()
|
||||
if err := np.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer np.Terminate()
|
||||
|
||||
fmt.Println("checking the new member is in v2 proxy mode...")
|
||||
ver, err = checkInternalVersion(np.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "1")
|
||||
}
|
||||
if _, err := os.Stat(path.Join(np.DataDir, "proxy")); err != nil {
|
||||
t.Errorf("stat proxy dir error = %v, want nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpgradeV1TLSCluster(t *testing.T) {
|
||||
// get v2-desired v1 data dir
|
||||
pg := NewProcGroupWithV1Flags(v1BinPath, 3)
|
||||
pg.SetPeerTLS("./fixtures/server.crt", "./fixtures/server.key.insecure", "./fixtures/ca.crt")
|
||||
if err := pg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
cmd := exec.Command(etcdctlBinPath,
|
||||
"upgrade", "--peer-url", pg[1].PeerURL,
|
||||
"--peer-cert-file", "./fixtures/server.crt",
|
||||
"--peer-key-file", "./fixtures/server.key.insecure",
|
||||
"--peer-ca-file", "./fixtures/ca.crt",
|
||||
)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
t.Logf("wait until etcd exits...")
|
||||
if err := pg.Wait(); err != nil {
|
||||
t.Fatalf("Wait error: %v", err)
|
||||
}
|
||||
|
||||
npg := NewProcGroupWithV1Flags(v2BinPath, 3)
|
||||
npg.SetPeerTLS("./fixtures/server.crt", "./fixtures/server.key.insecure", "./fixtures/ca.crt")
|
||||
npg.InheritDataDir(pg)
|
||||
npg.CleanUnsuppportedV1Flags()
|
||||
if err := npg.Start(); err != nil {
|
||||
t.Fatalf("Start error: %v", err)
|
||||
}
|
||||
defer npg.Terminate()
|
||||
|
||||
for _, p := range npg {
|
||||
ver, err := checkInternalVersion(p.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("checkVersion error: %v", err)
|
||||
}
|
||||
if ver != "2" {
|
||||
t.Errorf("internal version = %s, want %s", ver, "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func absPathFromEnv(name string) string {
|
||||
path, err := filepath.Abs(os.Getenv(name))
|
||||
if err != nil {
|
||||
fmt.Printf("unexpected Abs error: %v\n", err)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
func mustExist(path string) {
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
fmt.Printf("%v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func checkInternalVersion(url string) (string, error) {
|
||||
resp, err := http.Get(url + "/version")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var m map[string]string
|
||||
err = json.Unmarshal(b, &m)
|
||||
return m["internalVersion"], err
|
||||
}
|
@ -1,397 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package starter
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdmain"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/osutil"
|
||||
etcdversion "github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type version string
|
||||
|
||||
const (
|
||||
internalV1 version = "1"
|
||||
internalV2 version = "2"
|
||||
internalV2Proxy version = "2.proxy"
|
||||
internalUnknown version = "unknown"
|
||||
|
||||
v0_4 version = "v0.4"
|
||||
v2_0 version = "v2.0"
|
||||
v2_0Proxy version = "v2.0 proxy"
|
||||
empty version = "empty"
|
||||
unknown version = "unknown"
|
||||
|
||||
defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/internal_versions/"
|
||||
)
|
||||
|
||||
var (
|
||||
v2SpecialFlags = []string{
|
||||
"initial-cluster",
|
||||
"listen-peer-urls",
|
||||
"listen-client-urls",
|
||||
"proxy",
|
||||
}
|
||||
)
|
||||
|
||||
func StartDesiredVersion(args []string) {
|
||||
fs, err := parseConfig(args)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if fs.Lookup("version").Value.String() == "true" {
|
||||
fmt.Println("etcd version", etcdversion.Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
ver := checkInternalVersion(fs)
|
||||
log.Printf("starter: start etcd version %s", ver)
|
||||
switch ver {
|
||||
case internalV1:
|
||||
startInternalV1()
|
||||
case internalV2:
|
||||
case internalV2Proxy:
|
||||
if _, err := os.Stat(standbyInfo4(fs.Lookup("data-dir").Value.String())); err != nil {
|
||||
log.Printf("starter: Detect standby_info file exists, and add --proxy=on flag to ensure it runs in v2.0 proxy mode.")
|
||||
log.Printf("starter: Before removing v0.4 data, --proxy=on flag MUST be added.")
|
||||
}
|
||||
// append proxy flag to args to trigger proxy mode
|
||||
os.Args = append(os.Args, "-proxy=on")
|
||||
default:
|
||||
log.Panicf("starter: unhandled start version")
|
||||
}
|
||||
}
|
||||
|
||||
func checkInternalVersion(fs *flag.FlagSet) version {
|
||||
// If it uses 2.0 env var explicitly, start 2.0
|
||||
for _, name := range v2SpecialFlags {
|
||||
if fs.Lookup(name).Value.String() != "" {
|
||||
return internalV2
|
||||
}
|
||||
}
|
||||
|
||||
dataDir := fs.Lookup("data-dir").Value.String()
|
||||
if dataDir == "" {
|
||||
log.Fatalf("starter: please set --data-dir or ETCD_DATA_DIR for etcd")
|
||||
}
|
||||
// check the data directory
|
||||
dataver, err := wal.DetectVersion(dataDir)
|
||||
if err != nil {
|
||||
log.Fatalf("starter: failed to detect etcd version in %v: %v", dataDir, err)
|
||||
}
|
||||
log.Printf("starter: detect etcd version %s in %s", dataver, dataDir)
|
||||
switch dataver {
|
||||
case wal.WALv2_0:
|
||||
return internalV2
|
||||
case wal.WALv2_0Proxy:
|
||||
return internalV2Proxy
|
||||
case wal.WALv0_4:
|
||||
standbyInfo, err := migrate.DecodeStandbyInfo4FromFile(standbyInfo4(dataDir))
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
log.Fatalf("starter: failed to decode standbyInfo in %v: %v", dataDir, err)
|
||||
}
|
||||
inStandbyMode := standbyInfo != nil && standbyInfo.Running
|
||||
if inStandbyMode {
|
||||
ver, err := checkInternalVersionByClientURLs(standbyInfo.ClientURLs(), clientTLSInfo(fs))
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to check start version through peers: %v", err)
|
||||
return internalV1
|
||||
}
|
||||
if ver == internalV2 {
|
||||
osutil.Unsetenv("ETCD_DISCOVERY")
|
||||
os.Args = append(os.Args, "-initial-cluster", standbyInfo.InitialCluster())
|
||||
return internalV2Proxy
|
||||
}
|
||||
return ver
|
||||
}
|
||||
ver, err := checkInternalVersionByDataDir4(dataDir)
|
||||
if err != nil {
|
||||
log.Fatalf("starter: failed to check start version in %v: %v", dataDir, err)
|
||||
}
|
||||
return ver
|
||||
case wal.WALNotExist:
|
||||
discovery := fs.Lookup("discovery").Value.String()
|
||||
dpeers, err := getPeersFromDiscoveryURL(discovery)
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to get peers from discovery %s: %v", discovery, err)
|
||||
}
|
||||
peerStr := fs.Lookup("peers").Value.String()
|
||||
ppeers := getPeersFromPeersFlag(peerStr, peerTLSInfo(fs))
|
||||
|
||||
urls := getClientURLsByPeerURLs(append(dpeers, ppeers...), peerTLSInfo(fs))
|
||||
ver, err := checkInternalVersionByClientURLs(urls, clientTLSInfo(fs))
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to check start version through peers: %v", err)
|
||||
return internalV2
|
||||
}
|
||||
return ver
|
||||
}
|
||||
// never reach here
|
||||
log.Panicf("starter: unhandled etcd version in %v", dataDir)
|
||||
return internalUnknown
|
||||
}
|
||||
|
||||
func checkInternalVersionByDataDir4(dataDir string) (version, error) {
|
||||
// check v0.4 snapshot
|
||||
snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
|
||||
if err != nil {
|
||||
return internalUnknown, err
|
||||
}
|
||||
if snap4 != nil {
|
||||
st := &migrate.Store4{}
|
||||
if err := json.Unmarshal(snap4.State, st); err != nil {
|
||||
return internalUnknown, err
|
||||
}
|
||||
dir := st.Root.Children["_etcd"]
|
||||
n, ok := dir.Children["next-internal-version"]
|
||||
if ok && n.Value == "2" {
|
||||
return internalV2, nil
|
||||
}
|
||||
}
|
||||
|
||||
// check v0.4 log
|
||||
ents4, err := migrate.DecodeLog4FromFile(logFile4(dataDir))
|
||||
if err != nil {
|
||||
return internalUnknown, err
|
||||
}
|
||||
for _, e := range ents4 {
|
||||
cmd, err := migrate.NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
|
||||
if err != nil {
|
||||
return internalUnknown, err
|
||||
}
|
||||
setcmd, ok := cmd.(*migrate.SetCommand)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if setcmd.Key == "/_etcd/next-internal-version" && setcmd.Value == "2" {
|
||||
return internalV2, nil
|
||||
}
|
||||
}
|
||||
return internalV1, nil
|
||||
}
|
||||
|
||||
func getClientURLsByPeerURLs(peers []string, tls *TLSInfo) []string {
|
||||
c, err := newDefaultClient(tls)
|
||||
if err != nil {
|
||||
log.Printf("starter: new client error: %v", err)
|
||||
return nil
|
||||
}
|
||||
var urls []string
|
||||
for _, u := range peers {
|
||||
resp, err := c.Get(u + "/etcdURL")
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to get /etcdURL from %s", u)
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to read body from %s", u)
|
||||
continue
|
||||
}
|
||||
urls = append(urls, string(b))
|
||||
}
|
||||
return urls
|
||||
}
|
||||
|
||||
func checkInternalVersionByClientURLs(urls []string, tls *TLSInfo) (version, error) {
|
||||
c, err := newDefaultClient(tls)
|
||||
if err != nil {
|
||||
return internalUnknown, err
|
||||
}
|
||||
for _, u := range urls {
|
||||
resp, err := c.Get(u + "/version")
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to get /version from %s", u)
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to read body from %s", u)
|
||||
continue
|
||||
}
|
||||
|
||||
var m map[string]string
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
log.Printf("starter: failed to unmarshal body %s from %s", b, u)
|
||||
continue
|
||||
}
|
||||
switch m["internalVersion"] {
|
||||
case "1":
|
||||
return internalV1, nil
|
||||
case "2":
|
||||
return internalV2, nil
|
||||
default:
|
||||
log.Printf("starter: unrecognized internal version %s from %s", m["internalVersion"], u)
|
||||
}
|
||||
}
|
||||
return internalUnknown, fmt.Errorf("failed to get version from urls %v", urls)
|
||||
}
|
||||
|
||||
func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
|
||||
if discoverURL == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
u, err := url.Parse(discoverURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
token := u.Path
|
||||
u.Path = ""
|
||||
c, err := client.NewHTTPClient(&http.Transport{}, []string{u.String()})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dc := client.NewDiscoveryKeysAPI(c)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
resp, err := dc.Get(ctx, token)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peers := make([]string, 0)
|
||||
// append non-config keys to peers
|
||||
for _, n := range resp.Node.Nodes {
|
||||
if g := path.Base(n.Key); g == "_config" || g == "_state" {
|
||||
continue
|
||||
}
|
||||
peers = append(peers, n.Value)
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
func getPeersFromPeersFlag(str string, tls *TLSInfo) []string {
|
||||
peers := trimSplit(str, ",")
|
||||
for i, p := range peers {
|
||||
peers[i] = tls.Scheme() + "://" + p
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
func startInternalV1() {
|
||||
p := os.Getenv("ETCD_BINARY_DIR")
|
||||
if p == "" {
|
||||
p = defaultInternalV1etcdBinaryDir
|
||||
}
|
||||
p = path.Join(p, "1")
|
||||
err := syscall.Exec(p, os.Args, syscall.Environ())
|
||||
if err != nil {
|
||||
log.Fatalf("starter: failed to execute internal v1 etcd: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func newDefaultClient(tls *TLSInfo) (*http.Client, error) {
|
||||
tr := &http.Transport{}
|
||||
if tls.Scheme() == "https" {
|
||||
tlsConfig, err := tls.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr.TLSClientConfig = tlsConfig
|
||||
}
|
||||
return &http.Client{Transport: tr}, nil
|
||||
}
|
||||
|
||||
type value struct {
|
||||
isBoolFlag bool
|
||||
s string
|
||||
}
|
||||
|
||||
func (v *value) String() string { return v.s }
|
||||
|
||||
func (v *value) Set(s string) error {
|
||||
v.s = s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *value) IsBoolFlag() bool { return v.isBoolFlag }
|
||||
|
||||
type boolFlag interface {
|
||||
flag.Value
|
||||
IsBoolFlag() bool
|
||||
}
|
||||
|
||||
// parseConfig parses out the input config from cmdline arguments and
|
||||
// environment variables.
|
||||
func parseConfig(args []string) (*flag.FlagSet, error) {
|
||||
fs := flag.NewFlagSet("full flagset", flag.ContinueOnError)
|
||||
etcdmain.NewConfig().VisitAll(func(f *flag.Flag) {
|
||||
_, isBoolFlag := f.Value.(boolFlag)
|
||||
fs.Var(&value{isBoolFlag: isBoolFlag}, f.Name, "")
|
||||
})
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := flags.SetFlagsFromEnv(fs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
func clientTLSInfo(fs *flag.FlagSet) *TLSInfo {
|
||||
return &TLSInfo{
|
||||
CAFile: fs.Lookup("ca-file").Value.String(),
|
||||
CertFile: fs.Lookup("cert-file").Value.String(),
|
||||
KeyFile: fs.Lookup("key-file").Value.String(),
|
||||
}
|
||||
}
|
||||
|
||||
func peerTLSInfo(fs *flag.FlagSet) *TLSInfo {
|
||||
return &TLSInfo{
|
||||
CAFile: fs.Lookup("peer-ca-file").Value.String(),
|
||||
CertFile: fs.Lookup("peer-cert-file").Value.String(),
|
||||
KeyFile: fs.Lookup("peer-key-file").Value.String(),
|
||||
}
|
||||
}
|
||||
|
||||
func snapDir4(dataDir string) string {
|
||||
return path.Join(dataDir, "snapshot")
|
||||
}
|
||||
|
||||
func logFile4(dataDir string) string {
|
||||
return path.Join(dataDir, "log")
|
||||
}
|
||||
|
||||
func standbyInfo4(dataDir string) string {
|
||||
return path.Join(dataDir, "standby_info")
|
||||
}
|
||||
|
||||
func trimSplit(s, sep string) []string {
|
||||
trimmed := strings.Split(s, sep)
|
||||
for i := range trimmed {
|
||||
trimmed[i] = strings.TrimSpace(trimmed[i])
|
||||
}
|
||||
return trimmed
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package starter
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
args []string
|
||||
wvals map[string]string
|
||||
}{
|
||||
{
|
||||
[]string{"--name", "etcd", "--data-dir", "dir"},
|
||||
map[string]string{
|
||||
"name": "etcd",
|
||||
"data-dir": "dir",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--name=etcd", "--data-dir=dir"},
|
||||
map[string]string{
|
||||
"name": "etcd",
|
||||
"data-dir": "dir",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--version", "--name", "etcd"},
|
||||
map[string]string{
|
||||
"version": "true",
|
||||
"name": "etcd",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"--version=true", "--name", "etcd"},
|
||||
map[string]string{
|
||||
"version": "true",
|
||||
"name": "etcd",
|
||||
},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
fs, err := parseConfig(tt.args)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: unexpected parseConfig error: %v", i, err)
|
||||
}
|
||||
vals := make(map[string]string)
|
||||
fs.Visit(func(f *flag.Flag) {
|
||||
vals[f.Name] = f.Value.String()
|
||||
})
|
||||
if !reflect.DeepEqual(vals, tt.wvals) {
|
||||
t.Errorf("#%d: vals = %+v, want %+v", i, vals, tt.wvals)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,120 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package starter
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
// TLSInfo holds the SSL certificates paths.
|
||||
type TLSInfo struct {
|
||||
CertFile string `json:"CertFile"`
|
||||
KeyFile string `json:"KeyFile"`
|
||||
CAFile string `json:"CAFile"`
|
||||
}
|
||||
|
||||
func (info TLSInfo) Scheme() string {
|
||||
if info.KeyFile != "" && info.CertFile != "" {
|
||||
return "https"
|
||||
} else {
|
||||
return "http"
|
||||
}
|
||||
}
|
||||
|
||||
// Generates a tls.Config object for a server from the given files.
|
||||
func (info TLSInfo) ServerConfig() (*tls.Config, error) {
|
||||
// Both the key and cert must be present.
|
||||
if info.KeyFile == "" || info.CertFile == "" {
|
||||
return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile)
|
||||
}
|
||||
|
||||
var cfg tls.Config
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.Certificates = []tls.Certificate{tlsCert}
|
||||
|
||||
if info.CAFile != "" {
|
||||
cfg.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
cp, err := newCertPool(info.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.RootCAs = cp
|
||||
cfg.ClientCAs = cp
|
||||
} else {
|
||||
cfg.ClientAuth = tls.NoClientCert
|
||||
}
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// Generates a tls.Config object for a client from the given files.
|
||||
func (info TLSInfo) ClientConfig() (*tls.Config, error) {
|
||||
var cfg tls.Config
|
||||
|
||||
if info.KeyFile == "" || info.CertFile == "" {
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.Certificates = []tls.Certificate{tlsCert}
|
||||
|
||||
if info.CAFile != "" {
|
||||
cp, err := newCertPool(info.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.RootCAs = cp
|
||||
}
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// newCertPool creates x509 certPool with provided CA file
|
||||
func newCertPool(CAFile string) (*x509.CertPool, error) {
|
||||
certPool := x509.NewCertPool()
|
||||
pemByte, err := ioutil.ReadFile(CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for {
|
||||
var block *pem.Block
|
||||
block, pemByte = pem.Decode(pemByte)
|
||||
if block == nil {
|
||||
return certPool, nil
|
||||
}
|
||||
cert, err := x509.ParseCertificate(block.Bytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certPool.AddCert(cert)
|
||||
}
|
||||
|
||||
}
|
90
pkg/fileutil/lock_plan9.go
Normal file
90
pkg/fileutil/lock_plan9.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLocked = errors.New("file already locked")
|
||||
)
|
||||
|
||||
type Lock interface {
|
||||
Name() string
|
||||
TryLock() error
|
||||
Lock() error
|
||||
Unlock() error
|
||||
Destroy() error
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
fname string
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (l *lock) Name() string {
|
||||
return l.fname
|
||||
}
|
||||
|
||||
// TryLock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) TryLock() error {
|
||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Open(l.fname)
|
||||
if err != nil {
|
||||
return ErrLocked
|
||||
}
|
||||
|
||||
l.file = f
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock acquires exclusivity on the lock with blocking
|
||||
func (l *lock) Lock() error {
|
||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
f, err := os.Open(l.fname)
|
||||
if err == nil {
|
||||
l.file = f
|
||||
return nil
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock unlocks the lock
|
||||
func (l *lock) Unlock() error {
|
||||
return l.file.Close()
|
||||
}
|
||||
|
||||
func (l *lock) Destroy() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewLock(file string) (Lock, error) {
|
||||
l := &lock{fname: file}
|
||||
return l, nil
|
||||
}
|
98
pkg/fileutil/lock_solaris.go
Normal file
98
pkg/fileutil/lock_solaris.go
Normal file
@ -0,0 +1,98 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build solaris
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLocked = errors.New("file already locked")
|
||||
)
|
||||
|
||||
type Lock interface {
|
||||
Name() string
|
||||
TryLock() error
|
||||
Lock() error
|
||||
Unlock() error
|
||||
Destroy() error
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
fd int
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (l *lock) Name() string {
|
||||
return l.file.Name()
|
||||
}
|
||||
|
||||
// TryLock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) TryLock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Pid = 0
|
||||
lock.Type = syscall.F_WRLCK
|
||||
lock.Whence = 0
|
||||
lock.Pid = 0
|
||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
if err != nil && err == syscall.EAGAIN {
|
||||
return ErrLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) Lock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Type = syscall.F_WRLCK
|
||||
lock.Whence = 0
|
||||
lock.Pid = 0
|
||||
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
}
|
||||
|
||||
// Unlock unlocks the lock
|
||||
func (l *lock) Unlock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Type = syscall.F_UNLCK
|
||||
lock.Whence = 0
|
||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
if err != nil && err == syscall.EAGAIN {
|
||||
return ErrLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *lock) Destroy() error {
|
||||
return l.file.Close()
|
||||
}
|
||||
|
||||
func NewLock(file string) (Lock, error) {
|
||||
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l := &lock{int(f.Fd()), f}
|
||||
return l, nil
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows,!plan9
|
||||
// +build !windows,!plan9,!solaris
|
||||
|
||||
package fileutil
|
||||
|
||||
|
@ -88,13 +88,13 @@ func SetFlagsFromEnv(fs *flag.FlagSet) error {
|
||||
// SetBindAddrFromAddr sets the value of bindAddr flag from the value
|
||||
// of addr flag. Both flags' Value must be of type IPAddressPort. If the
|
||||
// bindAddr flag is set and the addr flag is unset, it will set bindAddr to
|
||||
// 0.0.0.0:port of addr. Otherwise, it keeps the original values.
|
||||
// [::]:port of addr. Otherwise, it keeps the original values.
|
||||
func SetBindAddrFromAddr(fs *flag.FlagSet, bindAddrFlagName, addrFlagName string) {
|
||||
if IsSet(fs, bindAddrFlagName) || !IsSet(fs, addrFlagName) {
|
||||
return
|
||||
}
|
||||
addr := *fs.Lookup(addrFlagName).Value.(*IPAddressPort)
|
||||
addr.IP = "0.0.0.0"
|
||||
addr.IP = "::"
|
||||
if err := fs.Set(bindAddrFlagName, addr.String()); err != nil {
|
||||
log.Panicf("etcdmain: unexpected flags set error: %v", err)
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ func TestSetBindAddrFromAddr(t *testing.T) {
|
||||
// addr flag set
|
||||
{
|
||||
args: []string{"-addr=192.0.3.17:4001"},
|
||||
waddr: &IPAddressPort{IP: "0.0.0.0", Port: 4001},
|
||||
waddr: &IPAddressPort{IP: "::", Port: 4001},
|
||||
},
|
||||
// bindAddr flag set
|
||||
{
|
||||
@ -106,6 +106,11 @@ func TestSetBindAddrFromAddr(t *testing.T) {
|
||||
args: []string{"-bind-addr=127.0.0.1:4001", "-addr=192.0.3.17:4001"},
|
||||
waddr: &IPAddressPort{IP: "127.0.0.1", Port: 4001},
|
||||
},
|
||||
// both addr flags set, IPv6
|
||||
{
|
||||
args: []string{"-bind-addr=[2001:db8::4:9]:4001", "-addr=[2001:db8::4:f0]:4001"},
|
||||
waddr: &IPAddressPort{IP: "2001:db8::4:9", Port: 4001},
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
fs := flag.NewFlagSet("test", flag.PanicOnError)
|
||||
|
@ -16,7 +16,6 @@ package flags
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -32,26 +31,26 @@ type IPAddressPort struct {
|
||||
func (a *IPAddressPort) Set(arg string) error {
|
||||
arg = strings.TrimSpace(arg)
|
||||
|
||||
parts := strings.SplitN(arg, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
return errors.New("bad format in address specification")
|
||||
host, portStr, err := net.SplitHostPort(arg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if net.ParseIP(parts[0]) == nil {
|
||||
if net.ParseIP(host) == nil {
|
||||
return errors.New("bad IP in address specification")
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(parts[1])
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
return errors.New("bad port in address specification")
|
||||
}
|
||||
|
||||
a.IP = parts[0]
|
||||
a.IP = host
|
||||
a.Port = port
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *IPAddressPort) String() string {
|
||||
return fmt.Sprintf("%s:%d", a.IP, a.Port)
|
||||
return net.JoinHostPort(a.IP, strconv.Itoa(a.Port))
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ func TestIPAddressPortSet(t *testing.T) {
|
||||
pass := []string{
|
||||
"1.2.3.4:8080",
|
||||
"10.1.1.1:80",
|
||||
"[2001:db8::1]:8080",
|
||||
}
|
||||
|
||||
fail := []string{
|
||||
@ -40,6 +41,8 @@ func TestIPAddressPortSet(t *testing.T) {
|
||||
"234#$",
|
||||
"file://foo/bar",
|
||||
"http://hello",
|
||||
"2001:db8::1",
|
||||
"2001:db8::1:1",
|
||||
}
|
||||
|
||||
for i, tt := range pass {
|
||||
@ -58,14 +61,20 @@ func TestIPAddressPortSet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIPAddressPortString(t *testing.T) {
|
||||
f := &IPAddressPort{}
|
||||
if err := f.Set("127.0.0.1:4001"); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
addresses := []string{
|
||||
"[2001:db8::1:1234]:4001",
|
||||
"127.0.0.1:4001",
|
||||
}
|
||||
for i, tt := range addresses {
|
||||
f := &IPAddressPort{}
|
||||
if err := f.Set(tt); err != nil {
|
||||
t.Errorf("#%d: unexpected error: %v", i, err)
|
||||
}
|
||||
|
||||
want := "127.0.0.1:4001"
|
||||
got := f.String()
|
||||
if want != got {
|
||||
t.Fatalf("IPAddressPort.String() value should be %q, got %q", want, got)
|
||||
want := tt
|
||||
got := f.String()
|
||||
if want != got {
|
||||
t.Errorf("#%d: IPAddressPort.String() value should be %q, got %q", i, want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,10 @@ func NewListener(addr string, scheme string, info TLSInfo) (net.Listener, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !info.Empty() && scheme == "https" {
|
||||
if scheme == "https" {
|
||||
if info.Empty() {
|
||||
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
|
||||
}
|
||||
cfg, err := info.ServerConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -70,6 +70,13 @@ func TestNewListenerTLSInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewListenerTLSEmptyInfo(t *testing.T) {
|
||||
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
|
||||
if err == nil {
|
||||
t.Errorf("err = nil, want not presented error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewListenerTLSInfoNonexist(t *testing.T) {
|
||||
tlsInfo := TLSInfo{CertFile: "@badname", KeyFile: "@badname"}
|
||||
_, err := NewListener("127.0.0.1:0", "https", tlsInfo)
|
||||
|
@ -23,14 +23,17 @@ import (
|
||||
// NewTimeoutTransport returns a transport created using the given TLS info.
|
||||
// If read/write on the created connection blocks longer than its time limit,
|
||||
// it will return timeout error.
|
||||
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
tr, err := NewTransport(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// the timeouted connection will tiemout soon after it is idle.
|
||||
// it should not be put back to http transport as an idle connection for future usage.
|
||||
tr.MaxIdleConnsPerHost = -1
|
||||
tr.Dial = (&rwTimeoutDialer{
|
||||
Dialer: net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: dialtimeoutd,
|
||||
KeepAlive: 30 * time.Second,
|
||||
},
|
||||
rdtimeoutd: rdtimeoutd,
|
||||
|
@ -15,6 +15,8 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
@ -24,11 +26,16 @@ import (
|
||||
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
|
||||
// that can dial out timeout connections.
|
||||
func TestNewTimeoutTransport(t *testing.T) {
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
|
||||
}
|
||||
srv := httptest.NewServer(http.NotFoundHandler())
|
||||
|
||||
remoteAddr := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte(r.RemoteAddr))
|
||||
}
|
||||
srv := httptest.NewServer(http.HandlerFunc(remoteAddr))
|
||||
|
||||
defer srv.Close()
|
||||
conn, err := tr.Dial("tcp", srv.Listener.Addr().String())
|
||||
if err != nil {
|
||||
@ -46,4 +53,33 @@ func TestNewTimeoutTransport(t *testing.T) {
|
||||
if tconn.wtimeoutd != time.Hour {
|
||||
t.Errorf("write timeout = %s, want %s", tconn.wtimeoutd, time.Hour)
|
||||
}
|
||||
|
||||
// ensure not reuse timeout connection
|
||||
req, err := http.NewRequest("GET", srv.URL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %v", err)
|
||||
}
|
||||
resp, err := tr.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %v", err)
|
||||
}
|
||||
addr0, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %v", err)
|
||||
}
|
||||
|
||||
resp, err = tr.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %v", err)
|
||||
}
|
||||
addr1, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %v", err)
|
||||
}
|
||||
|
||||
if bytes.Equal(addr0, addr1) {
|
||||
t.Errorf("addr0 = %s addr1= %s, want not equal", string(addr0), string(addr1))
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
@ -65,6 +66,13 @@ func (d *director) refresh() {
|
||||
}
|
||||
endpoints = append(endpoints, newEndpoint(*uu))
|
||||
}
|
||||
|
||||
// shuffle array to avoid connections being "stuck" to a single endpoint
|
||||
for i := range endpoints {
|
||||
j := rand.Intn(i + 1)
|
||||
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
|
||||
}
|
||||
|
||||
d.ep = endpoints
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,17 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
|
||||
// maintained between proxy and each member. We set it to 128 to
|
||||
// let proxy handle 128 concurrent requests in long term smoothly.
|
||||
// If the number of concurrent requests is bigger than this value,
|
||||
// proxy needs to create one new connection when handling each request in
|
||||
// the delta, which is bad because the creation consumes resource and
|
||||
// may eat up ephemeral ports.
|
||||
DefaultMaxIdleConnsPerHost = 128
|
||||
)
|
||||
|
||||
// GetProxyURLs is a function which should return the current set of URLs to
|
||||
// which client requests should be proxied. This function will be queried
|
||||
// periodically by the proxy Handler to refresh the set of available
|
||||
|
@ -15,8 +15,10 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
proxyreq := new(http.Request)
|
||||
*proxyreq = *clientreq
|
||||
|
||||
var (
|
||||
proxybody []byte
|
||||
err error
|
||||
)
|
||||
|
||||
if clientreq.Body != nil {
|
||||
proxybody, err = ioutil.ReadAll(clientreq.Body)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
|
||||
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
|
||||
e.WriteTo(rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// deep-copy the headers, as these will be modified below
|
||||
proxyreq.Header = make(http.Header)
|
||||
copyHeader(proxyreq.Header, clientreq.Header)
|
||||
@ -73,10 +90,31 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
completeCh := make(chan bool, 1)
|
||||
closeNotifier, ok := rw.(http.CloseNotifier)
|
||||
if ok {
|
||||
go func() {
|
||||
select {
|
||||
case <-closeNotifier.CloseNotify():
|
||||
tp, ok := p.transport.(*http.Transport)
|
||||
if ok {
|
||||
tp.CancelRequest(proxyreq)
|
||||
}
|
||||
case <-completeCh:
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
completeCh <- true
|
||||
}()
|
||||
}
|
||||
|
||||
var res *http.Response
|
||||
var err error
|
||||
|
||||
for _, ep := range endpoints {
|
||||
if proxybody != nil {
|
||||
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
|
||||
}
|
||||
redirectRequest(proxyreq, ep.URL)
|
||||
|
||||
res, err = p.transport.RoundTrip(proxyreq)
|
||||
|
@ -18,7 +18,7 @@ Package raft provides an implementation of the raft consensus algorithm.
|
||||
The primary object in raft is a Node. You either start a Node from scratch
|
||||
using raft.StartNode or start a Node from some initial state using raft.RestartNode.
|
||||
storage := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0x01, []int64{0x02, 0x03}, 3, 1, storage)
|
||||
n := raft.StartNode(0x01, []raft.Peer{{ID: 0x02}, {ID: 0x03}}, 3, 1, storage)
|
||||
|
||||
Now that you are holding onto a Node you have a few responsibilities:
|
||||
|
||||
|
@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
|
||||
}
|
||||
|
||||
func (l *raftLog) String() string {
|
||||
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
|
||||
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
|
||||
}
|
||||
|
||||
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
||||
|
@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
|
||||
|
||||
lead := None
|
||||
prevSoftSt := r.softState()
|
||||
prevHardSt := r.HardState
|
||||
prevHardSt := emptyState
|
||||
|
||||
for {
|
||||
if advancec != nil {
|
||||
|
@ -304,7 +304,7 @@ func TestNodeStart(t *testing.T) {
|
||||
wants := []Ready{
|
||||
{
|
||||
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 2},
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
|
||||
Entries: []raftpb.Entry{
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
{Term: 2, Index: 2},
|
||||
@ -315,7 +315,7 @@ func TestNodeStart(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 3},
|
||||
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
|
||||
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
|
||||
},
|
||||
@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries[:st.Commit],
|
||||
}
|
||||
@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries,
|
||||
}
|
||||
|
@ -306,9 +306,11 @@ func (r *raft) maybeCommit() bool {
|
||||
}
|
||||
|
||||
func (r *raft) reset(term uint64) {
|
||||
r.Term = term
|
||||
if r.Term != term {
|
||||
r.Term = term
|
||||
r.Vote = None
|
||||
}
|
||||
r.lead = None
|
||||
r.Vote = None
|
||||
r.elapsed = 0
|
||||
r.votes = make(map[uint64]bool)
|
||||
for i := range r.prs {
|
||||
|
@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
|
||||
from, err := types.IDFromString(fromStr)
|
||||
if err != nil {
|
||||
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
|
||||
http.Error(w, "invalid path", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ const (
|
||||
appRespBatchMs = 50
|
||||
propBatchMs = 10
|
||||
|
||||
DialTimeout = time.Second
|
||||
ConnReadTimeout = 5 * time.Second
|
||||
ConnWriteTimeout = 5 * time.Second
|
||||
)
|
||||
@ -199,7 +200,7 @@ func (p *peer) handle() {
|
||||
log.Printf("sender: the connection with %s became inactive", p.id)
|
||||
p.active = false
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||
p.fs.Fail()
|
||||
}
|
||||
} else {
|
||||
@ -208,7 +209,7 @@ func (p *peer) handle() {
|
||||
p.active = true
|
||||
p.errored = nil
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||
p.fs.Succ(end.Sub(start))
|
||||
}
|
||||
}
|
||||
|
42
rafthttp/remote.go
Normal file
42
rafthttp/remote.go
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rafthttp
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
type remote struct {
|
||||
id types.ID
|
||||
peer *peer
|
||||
}
|
||||
|
||||
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||
return &remote{
|
||||
id: to,
|
||||
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *remote) Send(m raftpb.Message) {
|
||||
g.peer.send(m)
|
||||
}
|
||||
|
||||
func (g *remote) Stop() {
|
||||
g.peer.Stop()
|
||||
}
|
@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
|
||||
// ignore lower-term streaming request
|
||||
if sw.term < s.w.term {
|
||||
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
|
||||
} else if sw.term == s.w.term {
|
||||
s.w.stopWithoutLog()
|
||||
} else {
|
||||
s.w.stop()
|
||||
}
|
||||
s.w.stop()
|
||||
}
|
||||
s.w = sw
|
||||
return nil
|
||||
@ -151,21 +154,23 @@ type WriteFlusher interface {
|
||||
|
||||
// TODO: replace fs with stream stats
|
||||
type streamWriter struct {
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
printLog bool
|
||||
}
|
||||
|
||||
// newStreamWriter starts and returns a new unstarted stream writer.
|
||||
// The caller should call stop when finished, to shut it down.
|
||||
func newStreamWriter(to types.ID, term uint64) *streamWriter {
|
||||
s := &streamWriter{
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
printLog: true,
|
||||
}
|
||||
return s
|
||||
}
|
||||
@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
|
||||
func (s *streamWriter) handle(w WriteFlusher) {
|
||||
defer func() {
|
||||
close(s.done)
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
if s.printLog {
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
}
|
||||
}()
|
||||
|
||||
ew := newEntryWriter(w, s.to)
|
||||
@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
|
||||
<-s.done
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopWithoutLog() {
|
||||
s.printLog = false
|
||||
s.stop()
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
|
||||
|
||||
// TODO: move the raft interface out of the reader.
|
||||
|
@ -35,6 +35,12 @@ type Raft interface {
|
||||
type Transporter interface {
|
||||
Handler() http.Handler
|
||||
Send(m []raftpb.Message)
|
||||
// AddRemote adds a remote with given peer urls into the transport.
|
||||
// A remote helps newly joined member to catch up the progress of cluster,
|
||||
// and will not be used after that.
|
||||
// It is the caller's responsibility to ensure the urls are all vaild,
|
||||
// or it panics.
|
||||
AddRemote(id types.ID, urls []string)
|
||||
AddPeer(id types.ID, urls []string)
|
||||
RemovePeer(id types.ID)
|
||||
RemoveAllPeers()
|
||||
@ -50,9 +56,10 @@ type transport struct {
|
||||
serverStats *stats.ServerStats
|
||||
leaderStats *stats.LeaderStats
|
||||
|
||||
mu sync.RWMutex // protect the peer map
|
||||
peers map[types.ID]*peer // remote peers
|
||||
errorc chan error
|
||||
mu sync.RWMutex // protect the remote and peer map
|
||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||
peers map[types.ID]*peer // peers map
|
||||
errorc chan error
|
||||
}
|
||||
|
||||
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
|
||||
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
|
||||
raft: r,
|
||||
serverStats: ss,
|
||||
leaderStats: ls,
|
||||
remotes: make(map[types.ID]*remote),
|
||||
peers: make(map[types.ID]*peer),
|
||||
errorc: errorc,
|
||||
}
|
||||
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
|
||||
continue
|
||||
}
|
||||
to := types.ID(m.To)
|
||||
|
||||
p, ok := t.peers[to]
|
||||
if !ok {
|
||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||
if ok {
|
||||
if m.Type == raftpb.MsgApp {
|
||||
t.serverStats.SendAppendReq(m.Size())
|
||||
}
|
||||
p.Send(m)
|
||||
continue
|
||||
}
|
||||
|
||||
if m.Type == raftpb.MsgApp {
|
||||
t.serverStats.SendAppendReq(m.Size())
|
||||
g, ok := t.remotes[to]
|
||||
if ok {
|
||||
g.Send(m)
|
||||
continue
|
||||
}
|
||||
|
||||
p.Send(m)
|
||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transport) Stop() {
|
||||
for _, r := range t.remotes {
|
||||
r.Stop()
|
||||
}
|
||||
for _, p := range t.peers {
|
||||
p.Stop()
|
||||
}
|
||||
@ -113,6 +130,21 @@ func (t *transport) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transport) AddRemote(id types.ID, us []string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if _, ok := t.remotes[id]; ok {
|
||||
return
|
||||
}
|
||||
peerURL := us[0]
|
||||
u, err := url.Parse(peerURL)
|
||||
if err != nil {
|
||||
log.Panicf("unexpect peer url %s", peerURL)
|
||||
}
|
||||
u.Path = path.Join(u.Path, RaftPrefix)
|
||||
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
|
||||
}
|
||||
|
||||
func (t *transport) AddPeer(id types.ID, urls []string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
@ -82,7 +82,10 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return snap, err
|
||||
if err != nil {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
|
@ -76,7 +76,7 @@ func TestBadCRC(t *testing.T) {
|
||||
// fake a crc mismatch
|
||||
crcTable = crc32.MakeTable(crc32.Koopman)
|
||||
|
||||
_, err = ss.Load()
|
||||
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
|
||||
if err == nil || err != ErrCRCMismatch {
|
||||
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
|
||||
}
|
||||
@ -182,7 +182,7 @@ func TestNoSnapshot(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
ss := New(dir)
|
||||
_, err = ss.Load()
|
||||
if err == nil || err != ErrNoSnapshot {
|
||||
if err != ErrNoSnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||
}
|
||||
}
|
||||
@ -195,14 +195,35 @@ func TestEmptySnapshot(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("shit"), 0x700)
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = Read(path.Join(dir, "1.snap"))
|
||||
if err != ErrEmptySnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllSnapshotBroken ensures snapshotter returens
|
||||
// ErrNoSnapshot if all the snapshots are broken.
|
||||
func TestAllSnapshotBroken(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ss := New(dir)
|
||||
_, err = ss.Load()
|
||||
if err == nil || err != ErrEmptySnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||
if err != ErrNoSnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||
}
|
||||
}
|
||||
|
@ -78,10 +78,24 @@ func newStats() *Stats {
|
||||
}
|
||||
|
||||
func (s *Stats) clone() *Stats {
|
||||
return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
|
||||
s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess,
|
||||
s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail,
|
||||
s.CompareAndDeleteSuccess, s.CompareAndDeleteFail, s.Watchers, s.ExpireCount}
|
||||
return &Stats{
|
||||
GetSuccess: s.GetSuccess,
|
||||
GetFail: s.GetFail,
|
||||
SetSuccess: s.SetSuccess,
|
||||
SetFail: s.SetFail,
|
||||
DeleteSuccess: s.DeleteSuccess,
|
||||
DeleteFail: s.DeleteFail,
|
||||
UpdateSuccess: s.UpdateSuccess,
|
||||
UpdateFail: s.UpdateFail,
|
||||
CreateSuccess: s.CreateSuccess,
|
||||
CreateFail: s.CreateFail,
|
||||
CompareAndSwapSuccess: s.CompareAndSwapSuccess,
|
||||
CompareAndSwapFail: s.CompareAndSwapFail,
|
||||
CompareAndDeleteSuccess: s.CompareAndDeleteSuccess,
|
||||
CompareAndDeleteFail: s.CompareAndDeleteFail,
|
||||
ExpireCount: s.ExpireCount,
|
||||
Watchers: s.Watchers,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) toJson() []byte {
|
||||
|
@ -84,7 +84,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
||||
|
||||
if ok { // add the new watcher to the back of the list
|
||||
elem = l.PushBack(w)
|
||||
|
||||
} else { // create a new list and add the new watcher
|
||||
l = list.New()
|
||||
elem = l.PushBack(w)
|
||||
@ -146,6 +145,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||
// if we successfully notify a watcher
|
||||
// we need to remove the watcher from the list
|
||||
// and decrease the counter
|
||||
w.removed = true
|
||||
l.Remove(curr)
|
||||
atomic.AddInt64(&wh.count, -1)
|
||||
}
|
||||
|
2
test
2
test
@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
|
||||
source ./build
|
||||
|
||||
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
|
||||
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes migrate pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
|
||||
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes migrate pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store version wal"
|
||||
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/ integration"
|
||||
|
||||
# user has not provided PKG override
|
||||
|
@ -15,7 +15,8 @@ etcd will detect 0.4.x data dir and update the data automatically (while leaving
|
||||
|
||||
The tool can be run via:
|
||||
```sh
|
||||
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA>
|
||||
./go build
|
||||
./etcd-migrate --data-dir=<PATH TO YOUR DATA>
|
||||
```
|
||||
|
||||
It should autodetect everything and convert the data-dir to be 2.0 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 2.0 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
|
||||
@ -44,4 +45,4 @@ If the conversion has completed, the entire cluster is running on something 2.0-
|
||||
rm -ri snapshot conf log
|
||||
```
|
||||
|
||||
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.
|
||||
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.
|
@ -14,7 +14,68 @@
|
||||
|
||||
package version
|
||||
|
||||
var (
|
||||
Version = "2.0.4+git"
|
||||
InternalVersion = "2"
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
var (
|
||||
Version = "2.0.12"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
)
|
||||
|
||||
// WalVersion is an enum for versions of etcd logs.
|
||||
type DataDirVersion string
|
||||
|
||||
const (
|
||||
DataDirUnknown DataDirVersion = "Unknown WAL"
|
||||
DataDir0_4 DataDirVersion = "0.4.x"
|
||||
DataDir2_0 DataDirVersion = "2.0.0"
|
||||
DataDir2_0Proxy DataDirVersion = "2.0 proxy"
|
||||
DataDir2_0_1 DataDirVersion = "2.0.1"
|
||||
)
|
||||
|
||||
func DetectDataDir(dirpath string) (DataDirVersion, error) {
|
||||
names, err := fileutil.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
// Error reading the directory
|
||||
return DataDirUnknown, err
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.Contains("member") {
|
||||
ver, err := DetectDataDir(path.Join(dirpath, "member"))
|
||||
if ver == DataDir2_0 {
|
||||
return DataDir2_0_1, nil
|
||||
} else if ver == DataDir0_4 {
|
||||
// How in the blazes did it get there?
|
||||
return DataDirUnknown, nil
|
||||
}
|
||||
return ver, err
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snap", "wal"}) {
|
||||
// .../wal cannot be empty to exist.
|
||||
walnames, err := fileutil.ReadDir(path.Join(dirpath, "wal"))
|
||||
if err == nil && len(walnames) > 0 {
|
||||
return DataDir2_0, nil
|
||||
}
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"proxy"}) {
|
||||
return DataDir2_0Proxy, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
return DataDir0_4, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"standby_info"}) {
|
||||
return DataDir0_4, nil
|
||||
}
|
||||
|
||||
return DataDirUnknown, nil
|
||||
}
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
package version
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
@ -22,21 +22,20 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDetectVersion(t *testing.T) {
|
||||
func TestDetectDataDir(t *testing.T) {
|
||||
tests := []struct {
|
||||
names []string
|
||||
wver WalVersion
|
||||
wver DataDirVersion
|
||||
}{
|
||||
{[]string{}, WALNotExist},
|
||||
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
|
||||
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
|
||||
{[]string{"weird"}, WALUnknown},
|
||||
{[]string{"snap/", "wal/"}, WALUnknown},
|
||||
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, DataDir2_0_1},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, DataDir2_0},
|
||||
{[]string{"snapshot/", "conf", "log"}, DataDir0_4},
|
||||
{[]string{"weird"}, DataDirUnknown},
|
||||
{[]string{"snap/", "wal/"}, DataDirUnknown},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
p := mustMakeDir(t, tt.names...)
|
||||
ver, err := DetectVersion(p)
|
||||
ver, err := DetectDataDir(p)
|
||||
if ver != tt.wver {
|
||||
t.Errorf("#%d: version = %s, want %s", i, ver, tt.wver)
|
||||
}
|
||||
@ -45,15 +44,6 @@ func TestDetectVersion(t *testing.T) {
|
||||
}
|
||||
os.RemoveAll(p)
|
||||
}
|
||||
|
||||
// detect on non-exist directory
|
||||
v, err := DetectVersion(path.Join(os.TempDir(), "waltest", "not-exist"))
|
||||
if v != WALNotExist {
|
||||
t.Errorf("#non-exist: version = %s, want %s", v, WALNotExist)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("#non-exist: err = %s, want %s", v, WALNotExist)
|
||||
}
|
||||
}
|
||||
|
||||
// mustMakeDir builds the directory that contains files with the given
|
60
wal/util.go
60
wal/util.go
@ -17,68 +17,10 @@ package wal
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
// WalVersion is an enum for versions of etcd logs.
|
||||
type WalVersion string
|
||||
|
||||
const (
|
||||
WALUnknown WalVersion = "Unknown WAL"
|
||||
WALNotExist WalVersion = "No WAL"
|
||||
WALv0_4 WalVersion = "0.4.x"
|
||||
WALv2_0 WalVersion = "2.0.0"
|
||||
WALv2_0Proxy WalVersion = "2.0 proxy"
|
||||
WALv2_0_1 WalVersion = "2.0.1"
|
||||
)
|
||||
|
||||
func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
names, err := fileutil.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
// Error reading the directory
|
||||
return WALNotExist, err
|
||||
}
|
||||
if len(names) == 0 {
|
||||
// Empty WAL directory
|
||||
return WALNotExist, nil
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.Contains("member") {
|
||||
ver, err := DetectVersion(path.Join(dirpath, "member"))
|
||||
if ver == WALv2_0 {
|
||||
return WALv2_0_1, nil
|
||||
} else if ver == WALv0_4 {
|
||||
// How in the blazes did it get there?
|
||||
return WALUnknown, nil
|
||||
}
|
||||
return ver, err
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snap", "wal"}) {
|
||||
// .../wal cannot be empty to exist.
|
||||
if Exist(path.Join(dirpath, "wal")) {
|
||||
return WALv2_0, nil
|
||||
}
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"proxy"}) {
|
||||
return WALv2_0Proxy, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
return WALv0_4, nil
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"standby_info"}) {
|
||||
return WALv0_4, nil
|
||||
}
|
||||
|
||||
return WALUnknown, nil
|
||||
}
|
||||
|
||||
func Exist(dirpath string) bool {
|
||||
names, err := fileutil.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
@ -125,7 +67,7 @@ func checkWalNames(names []string) []string {
|
||||
wnames := make([]string, 0)
|
||||
for _, name := range names {
|
||||
if _, _, err := parseWalName(name); err != nil {
|
||||
log.Printf("wal: parse %s error: %v", name, err)
|
||||
log.Printf("wal: ignored file %v in wal", name)
|
||||
continue
|
||||
}
|
||||
wnames = append(wnames, name)
|
||||
|
117
wal/wal.go
117
wal/wal.go
@ -273,30 +273,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
}
|
||||
|
||||
// Cut closes current file written and creates a new one ready to append.
|
||||
// cut first creates a temp wal file and writes necessary headers into it.
|
||||
// Then cut atomtically rename temp wal file to a wal file.
|
||||
func (w *WAL) Cut() error {
|
||||
// create a new wal file with name sequence + 1
|
||||
// close old wal file
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
||||
ftpath := fpath + ".tmp"
|
||||
|
||||
// create a temp wal file with name sequence + 1, or tuncate the existing one
|
||||
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
if err = w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.f.Close()
|
||||
|
||||
// update writer and save the previous crc
|
||||
w.f = f
|
||||
w.seq++
|
||||
w.f = ft
|
||||
prevCrc := w.encoder.crc.Sum32()
|
||||
w.encoder = newEncoder(w.f, prevCrc)
|
||||
if err := w.saveCrc(prevCrc); err != nil {
|
||||
@ -308,7 +306,45 @@ func (w *WAL) Cut() error {
|
||||
if err := w.saveState(&w.state); err != nil {
|
||||
return err
|
||||
}
|
||||
return w.sync()
|
||||
// close temp wal file
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// atomically move temp wal file to wal file
|
||||
if err := os.Rename(ftpath, fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// open the wal file and update writer again
|
||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.f = f
|
||||
prevCrc = w.encoder.crc.Sum32()
|
||||
w.encoder = newEncoder(w.f, prevCrc)
|
||||
|
||||
// lock the new wal file
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
|
||||
// increase the wal seq
|
||||
w.seq++
|
||||
|
||||
log.Printf("wal: segmented wal file %v is created", fpath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) sync() error {
|
||||
@ -320,27 +356,42 @@ func (w *WAL) sync() error {
|
||||
return w.f.Sync()
|
||||
}
|
||||
|
||||
// ReleaseLockTo releases the locks w is holding, which
|
||||
// have index smaller or equal to the given index.
|
||||
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
||||
// except the largest one among them.
|
||||
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
||||
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
|
||||
func (w *WAL) ReleaseLockTo(index uint64) error {
|
||||
for _, l := range w.locks {
|
||||
_, i, err := parseWalName(path.Base(l.Name()))
|
||||
var smaller int
|
||||
found := false
|
||||
|
||||
for i, l := range w.locks {
|
||||
_, lockIndex, err := parseWalName(path.Base(l.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if i > index {
|
||||
return nil
|
||||
if lockIndex >= index {
|
||||
smaller = i - 1
|
||||
found = true
|
||||
break
|
||||
}
|
||||
err = l.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Destroy()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = w.locks[1:]
|
||||
}
|
||||
|
||||
// if no lock index is greater than the release index, we can
|
||||
// release lock upto the last one(excluding).
|
||||
if !found && len(w.locks) != 0 {
|
||||
smaller = len(w.locks) - 1
|
||||
}
|
||||
|
||||
if smaller <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < smaller; i++ {
|
||||
w.locks[i].Unlock()
|
||||
w.locks[i].Destroy()
|
||||
}
|
||||
w.locks = w.locks[smaller:]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -435,6 +435,7 @@ func TestOpenNotInUse(t *testing.T) {
|
||||
unlockIndex := uint64(5)
|
||||
w.ReleaseLockTo(unlockIndex)
|
||||
|
||||
// 1,2,3 are avaliable.
|
||||
w2, err := OpenNotInUse(p, walpb.Snapshot{})
|
||||
defer w2.Close()
|
||||
if err != nil {
|
||||
@ -444,8 +445,8 @@ func TestOpenNotInUse(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
if g := ents[len(ents)-1].Index; g != unlockIndex {
|
||||
t.Errorf("last index read = %d, want %d", g, unlockIndex)
|
||||
if g := ents[len(ents)-1].Index; g != unlockIndex-2 {
|
||||
t.Errorf("last index read = %d, want %d", g, unlockIndex-2)
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,3 +463,62 @@ func TestSaveEmpty(t *testing.T) {
|
||||
t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReleaseLockTo(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
// create WAL
|
||||
w, err := Create(p, nil)
|
||||
defer w.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// make 10 seperate files
|
||||
for i := 0; i < 10; i++ {
|
||||
es := []raftpb.Entry{{Index: uint64(i)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Cut(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// release the lock to 5
|
||||
unlockIndex := uint64(5)
|
||||
w.ReleaseLockTo(unlockIndex)
|
||||
|
||||
// expected remaining are 4,5,6,7,8,9,10
|
||||
if len(w.locks) != 7 {
|
||||
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
|
||||
}
|
||||
for i, l := range w.locks {
|
||||
_, lockIndex, err := parseWalName(path.Base(l.Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if lockIndex != uint64(i+4) {
|
||||
t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
|
||||
}
|
||||
}
|
||||
|
||||
// release the lock to 15
|
||||
unlockIndex = uint64(15)
|
||||
w.ReleaseLockTo(unlockIndex)
|
||||
|
||||
// expected remaining is 10
|
||||
if len(w.locks) != 1 {
|
||||
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
|
||||
}
|
||||
_, lockIndex, err := parseWalName(path.Base(w.locks[0].Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if lockIndex != uint64(10) {
|
||||
t.Errorf("lockindex = %d, want %d", lockIndex, 10)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user