Compare commits
51 Commits
Author | SHA1 | Date | |
---|---|---|---|
92e3895214 | |||
b12a52b0fd | |||
9fa4002787 | |||
5686c33e4b | |||
6fd2dfdebc | |||
896ce1668c | |||
0520b4cd24 | |||
6ee6f72c48 | |||
b4dd519a63 | |||
a98fff84e7 | |||
973cfbebda | |||
00d1d34cf8 | |||
fcf81fd6bf | |||
0678329cd6 | |||
9a0e0c2eae | |||
3e4d57c37d | |||
d30e764b2d | |||
b5b7c78f1b | |||
ee1c07c3d4 | |||
67c5d4dfd2 | |||
3afcbd6f83 | |||
8fed61b2eb | |||
c8d386e18c | |||
2b6a44b7b0 | |||
8069d08b96 | |||
5074235254 | |||
f59bddd74b | |||
58f035844c | |||
f83774b4cd | |||
12c32137a8 | |||
fce4cf4dc8 | |||
06a72b2702 | |||
fbaef05885 | |||
31a94d28e3 | |||
88660a303f | |||
53c74dbd0b | |||
8a8af60fad | |||
7de19fefe8 | |||
7750f387b0 | |||
e33ab24442 | |||
fce2c1eeaf | |||
6a3bb93305 | |||
21455d2f3b | |||
51bb4220c5 | |||
d8c506923f | |||
5d778f85ca | |||
02697ca725 | |||
bd693c7069 | |||
52c90cdcfb | |||
a88b22ac0a | |||
e93f8b8a12 |
@ -61,7 +61,7 @@ After your cluster is up and running, adding or removing members is done via [ru
|
||||
|
||||
### Member Migration
|
||||
|
||||
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
||||
When there is a scheduled machine maintenance or retirement, you might want to migrate an etcd member to another machine without losing the data and changing the member ID.
|
||||
|
||||
The data directory contains all the data to recover a member to its point-in-time state. To migrate a member:
|
||||
|
||||
@ -102,7 +102,7 @@ $ sudo systemctl stop etcd
|
||||
#### Copy the data directory of the now-idle member to the new machine
|
||||
|
||||
```
|
||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||
$ tar -cvzf node1.etcd.tar.gz /var/lib/etcd/node1.etcd
|
||||
```
|
||||
|
||||
```
|
||||
@ -181,7 +181,9 @@ Once you have verified that etcd has started successfully, shut it down and move
|
||||
|
||||
#### Restoring the cluster
|
||||
|
||||
Now that the node is running successfully, you can add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
||||
Now that the node is running successfully, you should [change its advertised peer URLs](other_apis.md#change-the-peer-urls-of-a-member), as the `--force-new-cluster` has set the peer URL to the default (listening on localhost).
|
||||
|
||||
You can then add more nodes to the cluster and restore resiliency. See the [runtime configuration](runtime-configuration.md) guide for more details.
|
||||
|
||||
### Client Request Timeout
|
||||
|
||||
|
@ -4,6 +4,10 @@ etcd can now run as a transparent proxy. Running etcd as a proxy allows for easi
|
||||
|
||||
etcd currently supports two proxy modes: `readwrite` and `readonly`. The default mode is `readwrite`, which forwards both read and write requests to the etcd cluster. A `readonly` etcd proxy only forwards read requests to the etcd cluster, and returns `HTTP 501` to all write requests.
|
||||
|
||||
The proxy will shuffle the list of cluster members periodically to avoid sending all connections to a single member.
|
||||
|
||||
The member list used by proxy consists of all client URLs advertised within the cluster, as specified in each members' `-advertise-client-urls` flag. If this flag is set incorrectly, requests sent to the proxy are forwarded to wrong addresses and then fail. The fix for this problem is to restart etcd member with correct `-advertise-client-urls` flag. After client URLs list in proxy is recalculated, which happens every 30 seconds, requests will be forwarded correctly.
|
||||
|
||||
### Using an etcd proxy
|
||||
To start etcd in proxy mode, you need to provide three flags: `proxy`, `listen-client-urls`, and `initial-cluster` (or `discovery`).
|
||||
|
||||
|
6
build
6
build
@ -11,6 +11,8 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
||||
|
||||
eval $(go env)
|
||||
|
||||
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
|
||||
|
||||
# Static compilation is useful when etcd is run in a container
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags '-s' -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcdctl ${REPO_PATH}/etcdctl
|
||||
|
@ -105,7 +105,7 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent)
|
||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
|
||||
}
|
||||
|
||||
type membersAPIActionList struct{}
|
||||
|
@ -25,7 +25,8 @@ import (
|
||||
|
||||
var (
|
||||
// indirection for testing
|
||||
lookupSRV = net.LookupSRV
|
||||
lookupSRV = net.LookupSRV
|
||||
resolveTCPAddr = net.ResolveTCPAddr
|
||||
)
|
||||
|
||||
// TODO(barakmich): Currently ignores priority and weight (as they don't make as much sense for a bootstrap)
|
||||
@ -38,7 +39,7 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
|
||||
// First, resolve the apurls
|
||||
for _, url := range apurls {
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", url.Host)
|
||||
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
|
||||
if err != nil {
|
||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", url.Host)
|
||||
return "", "", err
|
||||
@ -52,8 +53,9 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
return err
|
||||
}
|
||||
for _, srv := range addrs {
|
||||
host := net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port))
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", host)
|
||||
target := strings.TrimSuffix(srv.Target, ".")
|
||||
host := net.JoinHostPort(target, fmt.Sprintf("%d", srv.Port))
|
||||
tcpAddr, err := resolveTCPAddr("tcp", host)
|
||||
if err != nil {
|
||||
log.Printf("discovery: Couldn't resolve host %s during SRV discovery", host)
|
||||
continue
|
||||
@ -68,8 +70,8 @@ func SRVGetCluster(name, dns string, defaultToken string, apurls types.URLs) (st
|
||||
n = fmt.Sprintf("%d", tempName)
|
||||
tempName += 1
|
||||
}
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, tcpAddr.String()))
|
||||
log.Printf("discovery: Got bootstrap from DNS for %s at host %s to %s%s", service, host, prefix, tcpAddr.String())
|
||||
stringParts = append(stringParts, fmt.Sprintf("%s=%s%s", n, prefix, host))
|
||||
log.Printf("discovery: Got bootstrap from DNS for %s at %s%s", service, prefix, host)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -23,19 +23,26 @@ import (
|
||||
)
|
||||
|
||||
func TestSRVGetCluster(t *testing.T) {
|
||||
defer func() { lookupSRV = net.LookupSRV }()
|
||||
defer func() {
|
||||
lookupSRV = net.LookupSRV
|
||||
resolveTCPAddr = net.ResolveTCPAddr
|
||||
}()
|
||||
|
||||
name := "dnsClusterTest"
|
||||
tests := []struct {
|
||||
withSSL []*net.SRV
|
||||
withoutSSL []*net.SRV
|
||||
urls []string
|
||||
expected string
|
||||
dns map[string]string
|
||||
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
[]*net.SRV{},
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
nil,
|
||||
|
||||
"",
|
||||
},
|
||||
{
|
||||
@ -46,6 +53,8 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
},
|
||||
[]*net.SRV{},
|
||||
nil,
|
||||
nil,
|
||||
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480",
|
||||
},
|
||||
{
|
||||
@ -58,6 +67,7 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||
},
|
||||
nil,
|
||||
nil,
|
||||
"0=https://10.0.0.1:2480,1=https://10.0.0.2:2480,2=https://10.0.0.3:2480,3=http://10.0.0.1:7001",
|
||||
},
|
||||
{
|
||||
@ -70,8 +80,22 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
&net.SRV{Target: "10.0.0.1", Port: 7001},
|
||||
},
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
nil,
|
||||
"dnsClusterTest=https://10.0.0.1:2480,0=https://10.0.0.2:2480,1=https://10.0.0.3:2480,2=http://10.0.0.1:7001",
|
||||
},
|
||||
// matching local member with resolved addr and return unresolved hostnames
|
||||
{
|
||||
[]*net.SRV{
|
||||
&net.SRV{Target: "1.example.com.", Port: 2480},
|
||||
&net.SRV{Target: "2.example.com.", Port: 2480},
|
||||
&net.SRV{Target: "3.example.com.", Port: 2480},
|
||||
},
|
||||
nil,
|
||||
[]string{"https://10.0.0.1:2480"},
|
||||
map[string]string{"1.example.com:2480": "10.0.0.1:2480", "2.example.com:2480": "10.0.0.2:2480", "3.example.com:2480": "10.0.0.3:2480"},
|
||||
|
||||
"dnsClusterTest=https://1.example.com:2480,0=https://2.example.com:2480,1=https://3.example.com:2480",
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -84,6 +108,12 @@ func TestSRVGetCluster(t *testing.T) {
|
||||
}
|
||||
return "", nil, errors.New("Unkown service in mock")
|
||||
}
|
||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||
if tt.dns == nil || tt.dns[addr] == "" {
|
||||
return net.ResolveTCPAddr(network, addr)
|
||||
}
|
||||
return net.ResolveTCPAddr(network, tt.dns[addr])
|
||||
}
|
||||
urls := testutil.MustNewURLs(t, tt.urls)
|
||||
str, token, err := SRVGetCluster(name, "example.com", "token", urls)
|
||||
if err != nil {
|
||||
|
@ -44,10 +44,10 @@ func NewBackupCommand() cli.Command {
|
||||
|
||||
// handleBackup handles a request that intends to do a backup.
|
||||
func handleBackup(c *cli.Context) {
|
||||
srcSnap := path.Join(c.String("data-dir"), "snap")
|
||||
destSnap := path.Join(c.String("backup-dir"), "snap")
|
||||
srcWAL := path.Join(c.String("data-dir"), "wal")
|
||||
destWAL := path.Join(c.String("backup-dir"), "wal")
|
||||
srcSnap := path.Join(c.String("data-dir"), "member", "snap")
|
||||
destSnap := path.Join(c.String("backup-dir"), "member", "snap")
|
||||
srcWAL := path.Join(c.String("data-dir"), "member", "wal")
|
||||
destWAL := path.Join(c.String("backup-dir"), "member", "wal")
|
||||
|
||||
if err := os.MkdirAll(destSnap, 0700); err != nil {
|
||||
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
||||
|
@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
|
||||
}
|
||||
|
||||
// do we have a leader?
|
||||
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
|
||||
cl := client.GetCluster()
|
||||
ep, ls0, err := getLeaderStats(tr, cl)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
fmt.Println("cluster may be unhealthy: failed to connect", cl)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ func NewImportSnapCommand() cli.Command {
|
||||
Usage: "import a snapshot to a cluster",
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
|
||||
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
|
||||
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
|
||||
},
|
||||
Action: handleImportSnap,
|
||||
@ -36,7 +37,7 @@ func handleImportSnap(c *cli.Context) {
|
||||
d, err := ioutil.ReadFile(c.String("snap"))
|
||||
if err != nil {
|
||||
if c.String("snap") == "" {
|
||||
fmt.Printf("no snapshot file provided (use --snap)")
|
||||
fmt.Printf("no snapshot file provided (use --snap)\n")
|
||||
} else {
|
||||
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
|
||||
}
|
||||
@ -83,6 +84,15 @@ func handleImportSnap(c *cli.Context) {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
n := copyKeys(all.Node, setc)
|
||||
|
||||
hiddens := c.StringSlice("hidden")
|
||||
for _, h := range hiddens {
|
||||
allh, err := st.Get(h, true, true)
|
||||
if err != nil {
|
||||
handleError(ErrorFromEtcd, err)
|
||||
}
|
||||
n += copyKeys(allh.Node, setc)
|
||||
}
|
||||
close(setc)
|
||||
wg.Wait()
|
||||
fmt.Printf("finished importing %d keys\n", n)
|
||||
|
@ -31,7 +31,7 @@ func main() {
|
||||
app.Flags = []cli.Flag{
|
||||
cli.BoolFlag{Name: "debug", Usage: "output cURL commands which can be used to reproduce the request"},
|
||||
cli.BoolFlag{Name: "no-sync", Usage: "don't synchronize cluster information before sending request"},
|
||||
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple` or `json`)"},
|
||||
cli.StringFlag{Name: "output, o", Value: "simple", Usage: "output response in the given format (`simple`, `extended` or `json`)"},
|
||||
cli.StringFlag{Name: "peers, C", Value: "", Usage: "a comma-delimited list of machine addresses in the cluster (default: \"127.0.0.1:4001\")"},
|
||||
cli.StringFlag{Name: "cert-file", Value: "", Usage: "identify HTTPS client using this SSL certificate file"},
|
||||
cli.StringFlag{Name: "key-file", Value: "", Usage: "identify HTTPS client using this SSL key file"},
|
||||
|
@ -15,18 +15,17 @@
|
||||
package etcdmain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
@ -64,6 +63,7 @@ var (
|
||||
|
||||
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set" +
|
||||
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
||||
errUnsetAdvertiseClientURLsFlag = fmt.Errorf("-advertise-client-urls is required when -listen-client-urls is set explicitly")
|
||||
)
|
||||
|
||||
type config struct {
|
||||
@ -213,7 +213,10 @@ func (cfg *config) Parse(arguments []string) error {
|
||||
}
|
||||
|
||||
if cfg.printVersion {
|
||||
fmt.Println("etcd version", version.Version)
|
||||
fmt.Printf("etcd Version: %s\n", version.Version)
|
||||
fmt.Printf("Git SHA: %s\n", version.GitSHA)
|
||||
fmt.Printf("Go Version: %s\n", runtime.Version())
|
||||
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
@ -256,8 +259,14 @@ func (cfg *config) Parse(arguments []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cfg.resolveUrls(); err != nil {
|
||||
return errors.New("cannot resolve DNS hostnames.")
|
||||
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
|
||||
// TODO(yichengq): check this for joining through discovery service case
|
||||
mayFallbackToProxy := flags.IsSet(cfg.FlagSet, "discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||
mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
|
||||
if !mayBeProxy {
|
||||
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
|
||||
return errUnsetAdvertiseClientURLsFlag
|
||||
}
|
||||
}
|
||||
|
||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||
@ -275,10 +284,6 @@ func initialClusterFromName(name string) string {
|
||||
return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
|
||||
}
|
||||
|
||||
func (cfg *config) resolveUrls() error {
|
||||
return netutil.ResolveTCPAddrs(cfg.lpurls, cfg.apurls, cfg.lcurls, cfg.acurls)
|
||||
}
|
||||
|
||||
func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew }
|
||||
func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff }
|
||||
func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly }
|
||||
|
@ -29,6 +29,8 @@ func TestConfigParsingMemberFlags(t *testing.T) {
|
||||
"-snapshot-count=10",
|
||||
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
|
||||
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
|
||||
// it should be set if -listen-client-urls is set
|
||||
"-advertise-client-urls=http://localhost:7000,https://localhost:7001",
|
||||
}
|
||||
wcfg := &config{
|
||||
dir: "testdir",
|
||||
@ -210,6 +212,71 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
|
||||
tests := []struct {
|
||||
args []string
|
||||
werr error
|
||||
}{
|
||||
{
|
||||
[]string{
|
||||
"-initial-cluster=infra1=http://127.0.0.1:2380",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery-srv=example.com",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery=http://example.com/abc",
|
||||
"-discovery-fallback=exit",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery=http://example.com/abc",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-proxy=on",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-proxy=readonly",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
cfg := NewConfig()
|
||||
err := cfg.Parse(tt.args)
|
||||
if err != tt.werr {
|
||||
t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsNewCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
state string
|
||||
|
@ -56,8 +56,12 @@ func Main() {
|
||||
cfg := NewConfig()
|
||||
err := cfg.Parse(os.Args[1:])
|
||||
if err != nil {
|
||||
log.Printf("etcd: error verifying flags, %v. See 'etcd -help'.", err)
|
||||
os.Exit(2)
|
||||
log.Printf("error verifying flags, %v. See 'etcd -help'.", err)
|
||||
switch err {
|
||||
case errUnsetAdvertiseClientURLsFlag:
|
||||
log.Printf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var stopped <-chan struct{}
|
||||
@ -90,8 +94,10 @@ func Main() {
|
||||
if err != nil {
|
||||
switch err {
|
||||
case discovery.ErrDuplicateID:
|
||||
log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
|
||||
cfg.name, cfg.durl, cfg.dir)
|
||||
log.Printf("member %q has previously registered with discovery service token (%s).", cfg.name, cfg.durl)
|
||||
log.Printf("But etcd could not find vaild cluster configuration in the given data dir (%s).", cfg.dir)
|
||||
log.Printf("Please check the given data dir path if the previous bootstrap succeeded")
|
||||
log.Printf("or use a new discovery token if the previous bootstrap failed.")
|
||||
default:
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
@ -110,7 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -191,7 +197,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Handler: etcdhttp.NewClientHandler(s),
|
||||
Info: cfg.corsInfo,
|
||||
}
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
|
||||
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
|
||||
// Start the peer server in a goroutine
|
||||
for _, l := range plns {
|
||||
go func(l net.Listener) {
|
||||
@ -227,6 +233,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(cfg.clientTLSInfo)
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -56,7 +56,8 @@ clustering flags:
|
||||
--initial-cluster-token 'etcd-cluster'
|
||||
initial cluster token for the etcd cluster during bootstrap.
|
||||
--advertise-client-urls 'http://localhost:2379,http://localhost:4001'
|
||||
list of this member's client URLs to advertise to the rest of the cluster.
|
||||
list of this member's client URLs to advertise to the public.
|
||||
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
|
||||
--discovery ''
|
||||
discovery URL used to bootstrap the cluster.
|
||||
--discovery-fallback 'proxy'
|
||||
|
@ -59,12 +59,6 @@ type Cluster struct {
|
||||
id types.ID
|
||||
token string
|
||||
store store.Store
|
||||
// index is the raft index that cluster is updated at bootstrap
|
||||
// from remote cluster info.
|
||||
// It may have a higher value than local raft index, because it
|
||||
// displays a further view of the cluster.
|
||||
// TODO: upgrade it as last modified index
|
||||
index uint64
|
||||
|
||||
sync.Mutex // guards members and removed map
|
||||
members map[types.ID]*Member
|
||||
@ -236,8 +230,6 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
||||
|
||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||
|
||||
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
|
||||
|
||||
func (c *Cluster) Recover() {
|
||||
c.members, c.removed = membersFromStore(c.store)
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@ -89,21 +88,7 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
|
||||
}
|
||||
continue
|
||||
}
|
||||
var index uint64
|
||||
// The header at or before v2.0.3 doesn't have this field. For backward
|
||||
// compatibility, it checks whether the field exists.
|
||||
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
|
||||
index, err = strconv.ParseUint(indexStr, 10, 64)
|
||||
if err != nil {
|
||||
if logerr {
|
||||
log.Printf("etcdserver: could not parse raft index: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
cl := NewClusterFromMembers("", id, membs)
|
||||
cl.UpdateIndex(index)
|
||||
return cl, nil
|
||||
return NewClusterFromMembers("", id, membs), nil
|
||||
}
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
@ -119,7 +119,6 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case resp.Event != nil:
|
||||
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
@ -29,10 +28,9 @@ const (
|
||||
)
|
||||
|
||||
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
|
||||
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
|
||||
mh := &peerMembersHandler{
|
||||
clusterInfo: clusterInfo,
|
||||
timer: timer,
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@ -45,7 +43,6 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTim
|
||||
|
||||
type peerMembersHandler struct {
|
||||
clusterInfo etcdserver.ClusterInfo
|
||||
timer etcdserver.RaftTimer
|
||||
}
|
||||
|
||||
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@ -53,7 +50,6 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
|
||||
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
|
||||
|
||||
if r.URL.Path != peerMembersPrefix {
|
||||
http.Error(w, "bad path", http.StatusBadRequest)
|
||||
|
@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
||||
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("test data"))
|
||||
})
|
||||
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
|
||||
ph := NewPeerHandler(&fakeCluster{}, h)
|
||||
srv := httptest.NewServer(ph)
|
||||
defer srv.Close()
|
||||
|
||||
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
|
||||
id: 1,
|
||||
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
|
||||
}
|
||||
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
|
||||
h := &peerMembersHandler{clusterInfo: cluster}
|
||||
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -158,6 +158,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
var remotes []*Member
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||
@ -170,7 +171,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
cfg.Cluster.UpdateIndex(existingCluster.index)
|
||||
remotes = existingCluster.Members()
|
||||
cfg.Cluster.SetID(existingCluster.id)
|
||||
cfg.Cluster.SetStore(st)
|
||||
cfg.Print()
|
||||
@ -238,6 +239,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
Name: cfg.Name,
|
||||
ID: id.String(),
|
||||
}
|
||||
sstats.Initialize()
|
||||
lstats := stats.NewLeaderStats(id.String())
|
||||
|
||||
srv := &EtcdServer{
|
||||
@ -260,8 +262,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||
}
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
|
||||
// add all the remote members into sendhub
|
||||
// add all remotes into transport
|
||||
for _, m := range remotes {
|
||||
if m.ID != id {
|
||||
tr.AddRemote(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
for _, m := range cfg.Cluster.Members() {
|
||||
if m.ID != id {
|
||||
tr.AddPeer(m.ID, m.PeerURLs)
|
||||
@ -292,7 +300,6 @@ func (s *EtcdServer) start() {
|
||||
s.w = wait.New()
|
||||
s.done = make(chan struct{})
|
||||
s.stop = make(chan struct{})
|
||||
s.stats.Initialize()
|
||||
// TODO: if this is an empty log, writes all peer infos
|
||||
// into the first entry
|
||||
go s.run()
|
||||
@ -395,19 +402,15 @@ func (s *EtcdServer) run() {
|
||||
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
||||
log.Panicf("recovery store error: %v", err)
|
||||
}
|
||||
s.Cluster.Recover()
|
||||
|
||||
// It avoids snapshot recovery overwriting newer cluster and
|
||||
// transport setting, which may block the communication.
|
||||
if s.Cluster.index < rd.Snapshot.Metadata.Index {
|
||||
s.Cluster.Recover()
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
// recover raft transport
|
||||
s.r.transport.RemoveAllPeers()
|
||||
for _, m := range s.Cluster.Members() {
|
||||
if m.ID == s.ID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
|
||||
appliedi = rd.Snapshot.Metadata.Index
|
||||
@ -671,9 +674,9 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) send(ms []raftpb.Message) {
|
||||
for _, m := range ms {
|
||||
if !s.Cluster.IsIDRemoved(types.ID(m.To)) {
|
||||
m.To = 0
|
||||
for i, _ := range ms {
|
||||
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
|
||||
ms[i].To = 0
|
||||
}
|
||||
}
|
||||
s.r.transport.Send(ms)
|
||||
@ -723,7 +726,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
switch {
|
||||
case existsSet:
|
||||
if exists {
|
||||
return f(s.store.Update(r.Path, r.Val, expr))
|
||||
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||
return f(s.store.Update(r.Path, r.Val, expr))
|
||||
} else {
|
||||
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
|
||||
}
|
||||
}
|
||||
return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
|
@ -235,20 +235,18 @@ func TestApplyRequest(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// PUT with PrevExist=true *and* PrevIndex set ==> Update
|
||||
// TODO(jonboulle): is this expected?!
|
||||
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
|
||||
{
|
||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
|
||||
Response{Event: &store.Event{}},
|
||||
[]testutil.Action{
|
||||
{
|
||||
Name: "Update",
|
||||
Params: []interface{}{"", "", time.Time{}},
|
||||
Name: "CompareAndSwap",
|
||||
Params: []interface{}{"", "", uint64(1), "", time.Time{}},
|
||||
},
|
||||
},
|
||||
},
|
||||
// PUT with PrevExist=false *and* PrevIndex set ==> Create
|
||||
// TODO(jonboulle): is this expected?!
|
||||
{
|
||||
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
|
||||
Response{Event: &store.Event{}},
|
||||
@ -1391,6 +1389,7 @@ type nopTransporter struct{}
|
||||
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
|
@ -170,6 +170,46 @@ func TestForceNewCluster(t *testing.T) {
|
||||
clusterMustProgress(t, c.Members[:1])
|
||||
}
|
||||
|
||||
// Ensure we can remove a member then add a new one back immediately.
|
||||
func TestIssue2681(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
c := NewCluster(t, 5)
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||
c.waitLeader(t, c.Members)
|
||||
|
||||
c.AddMember(t)
|
||||
c.waitLeader(t, c.Members)
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
// Ensure we can remove a member after a snapshot then add a new one back.
|
||||
func TestIssue2746(t *testing.T) {
|
||||
defer afterTest(t)
|
||||
c := NewCluster(t, 5)
|
||||
|
||||
for _, m := range c.Members {
|
||||
m.SnapCount = 10
|
||||
}
|
||||
|
||||
c.Launch(t)
|
||||
defer c.Terminate(t)
|
||||
|
||||
// force a snapshot
|
||||
for i := 0; i < 20; i++ {
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
c.RemoveMember(t, uint64(c.Members[4].s.ID()))
|
||||
c.waitLeader(t, c.Members)
|
||||
|
||||
c.AddMember(t)
|
||||
c.waitLeader(t, c.Members)
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
// clusterMustProgress ensures that cluster can make progress. It creates
|
||||
// a random key first, and check the new key could be got from all client urls
|
||||
// of the cluster.
|
||||
@ -526,7 +566,7 @@ func (m *member) Launch() error {
|
||||
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
|
||||
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
@ -623,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
|
||||
}
|
||||
|
||||
func mustNewTransport(t *testing.T) *http.Transport {
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
90
pkg/fileutil/lock_plan9.go
Normal file
90
pkg/fileutil/lock_plan9.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLocked = errors.New("file already locked")
|
||||
)
|
||||
|
||||
type Lock interface {
|
||||
Name() string
|
||||
TryLock() error
|
||||
Lock() error
|
||||
Unlock() error
|
||||
Destroy() error
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
fname string
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (l *lock) Name() string {
|
||||
return l.fname
|
||||
}
|
||||
|
||||
// TryLock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) TryLock() error {
|
||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Open(l.fname)
|
||||
if err != nil {
|
||||
return ErrLocked
|
||||
}
|
||||
|
||||
l.file = f
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock acquires exclusivity on the lock with blocking
|
||||
func (l *lock) Lock() error {
|
||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
f, err := os.Open(l.fname)
|
||||
if err == nil {
|
||||
l.file = f
|
||||
return nil
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock unlocks the lock
|
||||
func (l *lock) Unlock() error {
|
||||
return l.file.Close()
|
||||
}
|
||||
|
||||
func (l *lock) Destroy() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewLock(file string) (Lock, error) {
|
||||
l := &lock{fname: file}
|
||||
return l, nil
|
||||
}
|
98
pkg/fileutil/lock_solaris.go
Normal file
98
pkg/fileutil/lock_solaris.go
Normal file
@ -0,0 +1,98 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build solaris
|
||||
|
||||
package fileutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLocked = errors.New("file already locked")
|
||||
)
|
||||
|
||||
type Lock interface {
|
||||
Name() string
|
||||
TryLock() error
|
||||
Lock() error
|
||||
Unlock() error
|
||||
Destroy() error
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
fd int
|
||||
file *os.File
|
||||
}
|
||||
|
||||
func (l *lock) Name() string {
|
||||
return l.file.Name()
|
||||
}
|
||||
|
||||
// TryLock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) TryLock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Pid = 0
|
||||
lock.Type = syscall.F_WRLCK
|
||||
lock.Whence = 0
|
||||
lock.Pid = 0
|
||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
if err != nil && err == syscall.EAGAIN {
|
||||
return ErrLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock acquires exclusivity on the lock without blocking
|
||||
func (l *lock) Lock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Type = syscall.F_WRLCK
|
||||
lock.Whence = 0
|
||||
lock.Pid = 0
|
||||
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
}
|
||||
|
||||
// Unlock unlocks the lock
|
||||
func (l *lock) Unlock() error {
|
||||
var lock syscall.Flock_t
|
||||
lock.Start = 0
|
||||
lock.Len = 0
|
||||
lock.Type = syscall.F_UNLCK
|
||||
lock.Whence = 0
|
||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
||||
if err != nil && err == syscall.EAGAIN {
|
||||
return ErrLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *lock) Destroy() error {
|
||||
return l.file.Close()
|
||||
}
|
||||
|
||||
func NewLock(file string) (Lock, error) {
|
||||
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l := &lock{int(f.Fd()), f}
|
||||
return l, nil
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows,!plan9
|
||||
// +build !windows,!plan9,!solaris
|
||||
|
||||
package fileutil
|
||||
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
// NewTimeoutTransport returns a transport created using the given TLS info.
|
||||
// If read/write on the created connection blocks longer than its time limit,
|
||||
// it will return timeout error.
|
||||
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
tr, err := NewTransport(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -33,7 +33,7 @@ func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*ht
|
||||
tr.MaxIdleConnsPerHost = -1
|
||||
tr.Dial = (&rwTimeoutDialer{
|
||||
Dialer: net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: dialtimeoutd,
|
||||
KeepAlive: 30 * time.Second,
|
||||
},
|
||||
rdtimeoutd: rdtimeoutd,
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
|
||||
// that can dial out timeout connections.
|
||||
func TestNewTimeoutTransport(t *testing.T) {
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
@ -65,6 +66,13 @@ func (d *director) refresh() {
|
||||
}
|
||||
endpoints = append(endpoints, newEndpoint(*uu))
|
||||
}
|
||||
|
||||
// shuffle array to avoid connections being "stuck" to a single endpoint
|
||||
for i := range endpoints {
|
||||
j := rand.Intn(i + 1)
|
||||
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
|
||||
}
|
||||
|
||||
d.ep = endpoints
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,17 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
|
||||
// maintained between proxy and each member. We set it to 128 to
|
||||
// let proxy handle 128 concurrent requests in long term smoothly.
|
||||
// If the number of concurrent requests is bigger than this value,
|
||||
// proxy needs to create one new connection when handling each request in
|
||||
// the delta, which is bad because the creation consumes resource and
|
||||
// may eat up ephemeral ports.
|
||||
DefaultMaxIdleConnsPerHost = 128
|
||||
)
|
||||
|
||||
// GetProxyURLs is a function which should return the current set of URLs to
|
||||
// which client requests should be proxied. This function will be queried
|
||||
// periodically by the proxy Handler to refresh the set of available
|
||||
|
@ -15,8 +15,10 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
proxyreq := new(http.Request)
|
||||
*proxyreq = *clientreq
|
||||
|
||||
var (
|
||||
proxybody []byte
|
||||
err error
|
||||
)
|
||||
|
||||
if clientreq.Body != nil {
|
||||
proxybody, err = ioutil.ReadAll(clientreq.Body)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
|
||||
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
|
||||
e.WriteTo(rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// deep-copy the headers, as these will be modified below
|
||||
proxyreq.Header = make(http.Header)
|
||||
copyHeader(proxyreq.Header, clientreq.Header)
|
||||
@ -73,10 +90,31 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
completeCh := make(chan bool, 1)
|
||||
closeNotifier, ok := rw.(http.CloseNotifier)
|
||||
if ok {
|
||||
go func() {
|
||||
select {
|
||||
case <-closeNotifier.CloseNotify():
|
||||
tp, ok := p.transport.(*http.Transport)
|
||||
if ok {
|
||||
tp.CancelRequest(proxyreq)
|
||||
}
|
||||
case <-completeCh:
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
completeCh <- true
|
||||
}()
|
||||
}
|
||||
|
||||
var res *http.Response
|
||||
var err error
|
||||
|
||||
for _, ep := range endpoints {
|
||||
if proxybody != nil {
|
||||
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
|
||||
}
|
||||
redirectRequest(proxyreq, ep.URL)
|
||||
|
||||
res, err = p.transport.RoundTrip(proxyreq)
|
||||
|
@ -65,7 +65,7 @@ func newLog(storage Storage) *raftLog {
|
||||
}
|
||||
|
||||
func (l *raftLog) String() string {
|
||||
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
|
||||
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
|
||||
}
|
||||
|
||||
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
||||
|
@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
|
||||
|
||||
lead := None
|
||||
prevSoftSt := r.softState()
|
||||
prevHardSt := r.HardState
|
||||
prevHardSt := emptyState
|
||||
|
||||
for {
|
||||
if advancec != nil {
|
||||
|
@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries[:st.Commit],
|
||||
}
|
||||
@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries,
|
||||
}
|
||||
|
@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
|
||||
from, err := types.IDFromString(fromStr)
|
||||
if err != nil {
|
||||
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
|
||||
http.Error(w, "invalid path", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
@ -40,6 +40,7 @@ const (
|
||||
appRespBatchMs = 50
|
||||
propBatchMs = 10
|
||||
|
||||
DialTimeout = time.Second
|
||||
ConnReadTimeout = 5 * time.Second
|
||||
ConnWriteTimeout = 5 * time.Second
|
||||
)
|
||||
@ -199,7 +200,7 @@ func (p *peer) handle() {
|
||||
log.Printf("sender: the connection with %s became inactive", p.id)
|
||||
p.active = false
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||
p.fs.Fail()
|
||||
}
|
||||
} else {
|
||||
@ -208,7 +209,7 @@ func (p *peer) handle() {
|
||||
p.active = true
|
||||
p.errored = nil
|
||||
}
|
||||
if m.Type == raftpb.MsgApp {
|
||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
||||
p.fs.Succ(end.Sub(start))
|
||||
}
|
||||
}
|
||||
|
42
rafthttp/remote.go
Normal file
42
rafthttp/remote.go
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rafthttp
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
type remote struct {
|
||||
id types.ID
|
||||
peer *peer
|
||||
}
|
||||
|
||||
func startRemote(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||
return &remote{
|
||||
id: to,
|
||||
peer: NewPeer(tr, u, to, cid, r, nil, errorc),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *remote) Send(m raftpb.Message) {
|
||||
g.peer.send(m)
|
||||
}
|
||||
|
||||
func (g *remote) Stop() {
|
||||
g.peer.Stop()
|
||||
}
|
@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
|
||||
// ignore lower-term streaming request
|
||||
if sw.term < s.w.term {
|
||||
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
|
||||
} else if sw.term == s.w.term {
|
||||
s.w.stopWithoutLog()
|
||||
} else {
|
||||
s.w.stop()
|
||||
}
|
||||
s.w.stop()
|
||||
}
|
||||
s.w = sw
|
||||
return nil
|
||||
@ -151,21 +154,23 @@ type WriteFlusher interface {
|
||||
|
||||
// TODO: replace fs with stream stats
|
||||
type streamWriter struct {
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
printLog bool
|
||||
}
|
||||
|
||||
// newStreamWriter starts and returns a new unstarted stream writer.
|
||||
// The caller should call stop when finished, to shut it down.
|
||||
func newStreamWriter(to types.ID, term uint64) *streamWriter {
|
||||
s := &streamWriter{
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
printLog: true,
|
||||
}
|
||||
return s
|
||||
}
|
||||
@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
|
||||
func (s *streamWriter) handle(w WriteFlusher) {
|
||||
defer func() {
|
||||
close(s.done)
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
if s.printLog {
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
}
|
||||
}()
|
||||
|
||||
ew := newEntryWriter(w, s.to)
|
||||
@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
|
||||
<-s.done
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopWithoutLog() {
|
||||
s.printLog = false
|
||||
s.stop()
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
|
||||
|
||||
// TODO: move the raft interface out of the reader.
|
||||
|
@ -35,6 +35,12 @@ type Raft interface {
|
||||
type Transporter interface {
|
||||
Handler() http.Handler
|
||||
Send(m []raftpb.Message)
|
||||
// AddRemote adds a remote with given peer urls into the transport.
|
||||
// A remote helps newly joined member to catch up the progress of cluster,
|
||||
// and will not be used after that.
|
||||
// It is the caller's responsibility to ensure the urls are all vaild,
|
||||
// or it panics.
|
||||
AddRemote(id types.ID, urls []string)
|
||||
AddPeer(id types.ID, urls []string)
|
||||
RemovePeer(id types.ID)
|
||||
RemoveAllPeers()
|
||||
@ -50,9 +56,10 @@ type transport struct {
|
||||
serverStats *stats.ServerStats
|
||||
leaderStats *stats.LeaderStats
|
||||
|
||||
mu sync.RWMutex // protect the peer map
|
||||
peers map[types.ID]*peer // remote peers
|
||||
errorc chan error
|
||||
mu sync.RWMutex // protect the remote and peer map
|
||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||
peers map[types.ID]*peer // peers map
|
||||
errorc chan error
|
||||
}
|
||||
|
||||
func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
|
||||
@ -63,6 +70,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
|
||||
raft: r,
|
||||
serverStats: ss,
|
||||
leaderStats: ls,
|
||||
remotes: make(map[types.ID]*remote),
|
||||
peers: make(map[types.ID]*peer),
|
||||
errorc: errorc,
|
||||
}
|
||||
@ -90,21 +98,30 @@ func (t *transport) Send(msgs []raftpb.Message) {
|
||||
continue
|
||||
}
|
||||
to := types.ID(m.To)
|
||||
|
||||
p, ok := t.peers[to]
|
||||
if !ok {
|
||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||
if ok {
|
||||
if m.Type == raftpb.MsgApp {
|
||||
t.serverStats.SendAppendReq(m.Size())
|
||||
}
|
||||
p.Send(m)
|
||||
continue
|
||||
}
|
||||
|
||||
if m.Type == raftpb.MsgApp {
|
||||
t.serverStats.SendAppendReq(m.Size())
|
||||
g, ok := t.remotes[to]
|
||||
if ok {
|
||||
g.Send(m)
|
||||
continue
|
||||
}
|
||||
|
||||
p.Send(m)
|
||||
log.Printf("etcdserver: send message to unknown receiver %s", to)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transport) Stop() {
|
||||
for _, r := range t.remotes {
|
||||
r.Stop()
|
||||
}
|
||||
for _, p := range t.peers {
|
||||
p.Stop()
|
||||
}
|
||||
@ -113,6 +130,21 @@ func (t *transport) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transport) AddRemote(id types.ID, us []string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if _, ok := t.remotes[id]; ok {
|
||||
return
|
||||
}
|
||||
peerURL := us[0]
|
||||
u, err := url.Parse(peerURL)
|
||||
if err != nil {
|
||||
log.Panicf("unexpect peer url %s", peerURL)
|
||||
}
|
||||
u.Path = path.Join(u.Path, RaftPrefix)
|
||||
t.remotes[id] = startRemote(t.roundTripper, u.String(), t.id, id, t.clusterID, t.raft, t.errorc)
|
||||
}
|
||||
|
||||
func (t *transport) AddPeer(id types.ID, urls []string) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
@ -82,7 +82,10 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return snap, err
|
||||
if err != nil {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
|
||||
|
@ -76,7 +76,7 @@ func TestBadCRC(t *testing.T) {
|
||||
// fake a crc mismatch
|
||||
crcTable = crc32.MakeTable(crc32.Koopman)
|
||||
|
||||
_, err = ss.Load()
|
||||
_, err = Read(path.Join(dir, fmt.Sprintf("%016x-%016x.snap", 1, 1)))
|
||||
if err == nil || err != ErrCRCMismatch {
|
||||
t.Errorf("err = %v, want %v", err, ErrCRCMismatch)
|
||||
}
|
||||
@ -182,7 +182,7 @@ func TestNoSnapshot(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
ss := New(dir)
|
||||
_, err = ss.Load()
|
||||
if err == nil || err != ErrNoSnapshot {
|
||||
if err != ErrNoSnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||
}
|
||||
}
|
||||
@ -195,14 +195,35 @@ func TestEmptySnapshot(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("shit"), 0x700)
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte(""), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = Read(path.Join(dir, "1.snap"))
|
||||
if err != ErrEmptySnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllSnapshotBroken ensures snapshotter returens
|
||||
// ErrNoSnapshot if all the snapshots are broken.
|
||||
func TestAllSnapshotBroken(t *testing.T) {
|
||||
dir := path.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "1.snap"), []byte("bad"), 0x700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ss := New(dir)
|
||||
_, err = ss.Load()
|
||||
if err == nil || err != ErrEmptySnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrEmptySnapshot)
|
||||
if err != ErrNoSnapshot {
|
||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||
}
|
||||
}
|
||||
|
@ -84,7 +84,6 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
||||
|
||||
if ok { // add the new watcher to the back of the list
|
||||
elem = l.PushBack(w)
|
||||
|
||||
} else { // create a new list and add the new watcher
|
||||
l = list.New()
|
||||
elem = l.PushBack(w)
|
||||
@ -146,6 +145,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||
// if we successfully notify a watcher
|
||||
// we need to remove the watcher from the list
|
||||
// and decrease the counter
|
||||
w.removed = true
|
||||
l.Remove(curr)
|
||||
atomic.AddInt64(&wh.count, -1)
|
||||
}
|
||||
|
@ -23,7 +23,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
Version = "2.0.8"
|
||||
Version = "2.0.13"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
)
|
||||
|
||||
// WalVersion is an enum for versions of etcd logs.
|
||||
|
@ -67,7 +67,7 @@ func checkWalNames(names []string) []string {
|
||||
wnames := make([]string, 0)
|
||||
for _, name := range names {
|
||||
if _, _, err := parseWalName(name); err != nil {
|
||||
log.Printf("wal: parse %s error: %v", name, err)
|
||||
log.Printf("wal: ignored file %v in wal", name)
|
||||
continue
|
||||
}
|
||||
wnames = append(wnames, name)
|
||||
|
72
wal/wal.go
72
wal/wal.go
@ -273,30 +273,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
}
|
||||
|
||||
// Cut closes current file written and creates a new one ready to append.
|
||||
// cut first creates a temp wal file and writes necessary headers into it.
|
||||
// Then cut atomtically rename temp wal file to a wal file.
|
||||
func (w *WAL) Cut() error {
|
||||
// create a new wal file with name sequence + 1
|
||||
// close old wal file
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
||||
ftpath := fpath + ".tmp"
|
||||
|
||||
// create a temp wal file with name sequence + 1, or tuncate the existing one
|
||||
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
if err = w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.f.Close()
|
||||
|
||||
// update writer and save the previous crc
|
||||
w.f = f
|
||||
w.seq++
|
||||
w.f = ft
|
||||
prevCrc := w.encoder.crc.Sum32()
|
||||
w.encoder = newEncoder(w.f, prevCrc)
|
||||
if err := w.saveCrc(prevCrc); err != nil {
|
||||
@ -308,7 +306,45 @@ func (w *WAL) Cut() error {
|
||||
if err := w.saveState(&w.state); err != nil {
|
||||
return err
|
||||
}
|
||||
return w.sync()
|
||||
// close temp wal file
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// atomically move temp wal file to wal file
|
||||
if err := os.Rename(ftpath, fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// open the wal file and update writer again
|
||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.f = f
|
||||
prevCrc = w.encoder.crc.Sum32()
|
||||
w.encoder = newEncoder(w.f, prevCrc)
|
||||
|
||||
// lock the new wal file
|
||||
l, err := fileutil.NewLock(f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.Lock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.locks = append(w.locks, l)
|
||||
|
||||
// increase the wal seq
|
||||
w.seq++
|
||||
|
||||
log.Printf("wal: segmented wal file %v is created", fpath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) sync() error {
|
||||
|
Reference in New Issue
Block a user