Compare commits

..

97 Commits

Author SHA1 Message Date
7b289043c7 Merge pull request #130 from philips/add-version-to-join2
add versioning to cluster join
2013-08-19 09:45:49 -07:00
b430a07e1b chore(name_url_map): rename version to raftVersion
make it more clear that we are referring to the raftVersion.
2013-08-19 09:37:34 -07:00
52cbc89607 Merge pull request #132 from philips/add-new-projects2
feat(README): add etcdenv project
2013-08-19 09:22:53 -07:00
e848659db6 feat(README): add etcdenv project 2013-08-19 09:21:36 -07:00
9683bd37a7 Merge pull request #131 from philips/add-new-projects
feat(README): add some new projects
2013-08-19 09:20:06 -07:00
2991bf58e1 feat(README): add etcd-vim 2013-08-19 09:17:06 -07:00
e0731233c2 feat(README): add some new projects 2013-08-19 09:12:14 -07:00
bfc68e8e37 fix(raft_server): rename getLeaderVersion to getVersion 2013-08-19 08:53:15 -07:00
3fff0a3c2b fix(version): add raftVersion to the version file 2013-08-19 08:45:58 -07:00
fc776f2ad6 fix(raft_server): add comment on version field
explain what the version field is for and why it is set to
releaseVersion
2013-08-18 21:54:07 -07:00
e79f6842bb fix(command): change Version to RaftVersion
clear up confusion on what this field is used for: it is for the
internal raft protocol version only.
2013-08-18 21:54:07 -07:00
2c9e90d6ad feat(raft_server): do not allow mixed versions
fail to join if there is an internal version mismatch.
2013-08-18 21:54:07 -07:00
53b2038d2e feat(command): add version to join command
Add a version to the join command. Add a versioning document to discuss
some of the design decisions.
2013-08-18 21:54:07 -07:00
e091923311 Merge pull request #128 from xiangli-cmu/bump
Bump
2013-08-18 21:35:55 -07:00
f813017f1b fix raft api 2013-08-18 21:12:36 -07:00
111888adea bump(code.google.com/p/goprotobuf): 1141ccae4b85 2013-08-18 19:43:26 -07:00
ea28b1cdf3 bump(code.google.com/p/go.net): bc411e2ac33f 2013-08-18 19:43:24 -07:00
2662b3c559 bump(github.com/ccding/go-config-reader): 8b6c2b50197f20da3b1c5944c274c173634dc056 2013-08-18 19:43:20 -07:00
7ec0ee2a19 bump(github.com/ccding/go-logging): 4f3650d51969cc425c1940efa31fcb7c0bba86b3 2013-08-18 19:43:19 -07:00
13afdb0825 bump(github.com/coreos/go-etcd): 460022c1238ee0913013936e3486f41a3c7c1d7a 2013-08-18 19:43:14 -07:00
449cad4658 bump(github.com/coreos/go-raft): bb7f7ec92e4cb6d98241cea83f55d0e85e624189 2013-08-18 19:43:13 -07:00
393ed439b1 Merge pull request #127 from xiangli-cmu/master
clean error handling
2013-08-18 19:03:51 -07:00
1527b7008c fix test 2013-08-17 21:21:18 -07:00
5357fb431e Pull deeply nested logic into functions 2013-08-17 20:55:52 -07:00
cf2d6888c2 add error package 2013-08-17 20:41:15 -07:00
8ed67bedbb clean error handling 2013-08-17 15:06:21 -07:00
ef4aef950e Merge pull request #126 from philips/fix-missing-info-file
fix(config): use IsNotExist to test missing file
2013-08-17 14:07:50 -07:00
bcc77db8a9 fix(config): use IsNotExist to test missing file
Fixes #125 where a config file never gets created
2013-08-17 13:41:01 -07:00
f1786b8083 Merge pull request #122 from philips/fix-nip-in-transporter
chore(trasnsporter): delete unused variables
2013-08-17 09:06:16 -07:00
ec6a7be63a Merge pull request #124 from philips/xiangli-cleanup
Cleanups from #112
2013-08-17 08:36:46 -07:00
e50871cc36 remove unused struct 2013-08-17 08:30:32 -07:00
5bd24d8271 wait for exit and release resource 2013-08-17 08:30:32 -07:00
c459b4bda7 go flavour 2013-08-17 08:30:31 -07:00
981351c9d9 use type inheritance 2013-08-17 08:30:31 -07:00
012e747f18 make ttl test not so strict. testing server is not fast 2013-08-17 08:30:31 -07:00
e0ca8f20d2 add newJoinCommand func(). 2013-08-17 08:30:31 -07:00
ca4b5815f7 make raft and etcd server 2013-08-17 08:30:31 -07:00
f490fba698 also return API version 2013-08-17 08:30:31 -07:00
6bdb9af7f6 handler version at /version 2013-08-17 08:30:31 -07:00
7004a6bcc1 fix travis 2013-08-17 08:30:31 -07:00
177854c3e1 add test package. do not compile test codes with etcd 2013-08-17 08:30:31 -07:00
ee66f231b6 clean getMachines 2013-08-17 08:30:31 -07:00
c7e7e13aa4 handle readInfo error 2013-08-17 08:30:31 -07:00
9240258dc9 use var() 2013-08-17 08:30:31 -07:00
fb00d335c0 cleanup print 2013-08-17 08:30:31 -07:00
c3533d6ac2 fix test 2013-08-17 08:30:30 -07:00
cb33641f5f clean up 2013-08-17 08:30:30 -07:00
2c09cd7d1a chore(trasnsporter): delete unused variables
just return directly instead of creating intermediate variables
2013-08-17 07:57:09 -07:00
f8764df6ad Merge pull request #121 from philips/alternative-go-version-check
feat(go_version): check go version at build time
2013-08-16 15:35:37 -07:00
70f2590127 feat(go_version): check go version at build time
```
$ ./build
can't load package: package github.com/coreos/etcd:
src/github.com/coreos/etcd/go_version.go:3:1: expected 'package', found
'STRING' "etcd requires go 1.1 or greater to build"
```
2013-08-16 15:14:19 -07:00
0cb5eef40a Merge pull request #118 from philips/fix-test-and-set
fix(command): be consistent will all CommandNames
2013-08-15 21:40:34 -07:00
3e59badf1a fix(command): be consistent will all CommandNames
testAndSet was missing the etcd: prefix. Make it consistent and have a
helper function.
2013-08-15 21:38:15 -07:00
22b943e35c Merge pull request #117 from mairbek/locking
Concurrent GET requests should not block.
2013-08-15 13:27:55 -07:00
ac9801f570 Concurrent GET requests should not block. 2013-08-15 23:06:08 +03:00
b17a2e2bf1 Merge pull request #114 from zefhemel/dockerfile
Added dockerfile
2013-08-15 07:06:50 -07:00
9ede78d75f Added dockerfile 2013-08-15 10:00:37 +02:00
fe2d1c1b0e Merge pull request #109 from xiangli-cmu/fmilo
split raft server logic into separate module
2013-08-13 12:37:46 -07:00
915266d5f5 move tslconf to conf.go 2013-08-13 12:28:50 -07:00
3940196de0 move trans related func to trans.go 2013-08-13 12:23:35 -07:00
f7dc48ad00 gofmt 2013-08-13 12:17:33 -07:00
b71811375b fix race 2013-08-13 12:17:19 -07:00
82fe001c65 move etcdMux to etcd_handlers.go (better proximity code) 2013-08-13 11:52:55 -07:00
0aebf3757d use check utl 2013-08-13 11:52:55 -07:00
6299f316f1 use check util 2013-08-13 11:52:55 -07:00
3102420542 use infof instead of fmt.Printf 2013-08-13 11:52:55 -07:00
e7d15b6488 split config 2013-08-13 11:52:55 -07:00
339d8b435d move to util 2013-08-13 11:52:55 -07:00
e6d8d4046d split raft server logic into separate module 2013-08-13 11:52:55 -07:00
ad55b4236b Update README.md 2013-08-13 10:36:25 -07:00
7afbbb294d Merge pull request #102 from Mistobaan/master
minor go idiomatic fixes
2013-08-12 17:23:05 -07:00
d88bfc084b fix doc 2013-08-12 17:18:05 -07:00
ddc53c6584 use filepath 2013-08-12 17:18:05 -07:00
21c658b151 Merge pull request #86 from xiangli-cmu/master
Change snapshot to clientside
2013-08-12 10:46:34 -07:00
58e9e0c557 add comments in snapshot.go 2013-08-12 10:41:44 -07:00
969c8ba8ca log remoteAddr in etcdHttpHandler 2013-08-12 10:29:50 -07:00
32d1681b6b change name to url in http handlers 2013-08-12 10:16:30 -07:00
6ac6dfcc52 Merge pull request #100 from xiangli-cmu/fix95
fixes #95
2013-08-12 09:30:48 -07:00
928781aaa3 fix #95 2013-08-12 09:24:33 -07:00
1107f1d7ab Merge pull request #98 from philips/node-library
feat(README): add node library
2013-08-12 09:06:51 -07:00
1bf4e656a8 feat(README): add node library
Thanks for @stianeikeland for starting a node-etcd library.
2013-08-12 08:59:40 -07:00
aad1626dc9 typo(README): thanks asbjorn 2013-08-12 00:03:06 -07:00
2b14fbebde Merge pull request #94 from philips/fixup-readme
feat(README): point people at etcdctl
2013-08-11 19:36:56 -07:00
e8a284d295 feat(README): point people at etcdctl
etcdctl is included in the releases so point people at it.
2013-08-11 19:10:35 -07:00
0e26d96791 Merge pull request #93 from philips/fixup-readme
README fixups
2013-08-11 19:05:54 -07:00
b3654e68d9 fix(README): fixup a grammar bug 2013-08-11 19:05:06 -07:00
9d85c741d9 fix(README): use -n everywhere 2013-08-11 19:03:43 -07:00
47babce767 feat(README): add the active-proxy project 2013-08-11 17:40:19 -07:00
408d0caafc fix(README): remove url highlight
```url isn't a thing. delete it.
2013-08-11 17:19:57 -07:00
8e48a20c85 clean up trans.go 2013-08-11 11:56:18 -07:00
8f3e6f340f remove duplicate codes 2013-08-11 11:42:38 -07:00
6120fa634e remove duplicate codes 2013-08-11 11:40:45 -07:00
fa6c8f4f18 fix naming in long_test.go 2013-08-11 11:04:15 -07:00
1124fe21a0 cleaning up 2013-08-11 10:18:40 -07:00
e3dae8fcf9 do not print out debug info when testing 2013-08-11 09:53:02 -07:00
d3649d3254 gofmt 2013-08-11 09:48:12 -07:00
434b0045db add snapshot 2013-08-11 09:47:23 -07:00
64eeca3941 add snpshot 2013-08-11 09:47:10 -07:00
61 changed files with 2012 additions and 1289 deletions

View File

@ -5,4 +5,4 @@ install:
- echo "Skip install"
script:
- ./test
- ./test.sh

10
Dockerfile Normal file
View File

@ -0,0 +1,10 @@
FROM ubuntu:12.04
RUN apt-get update
RUN apt-get install -y python-software-properties git
RUN add-apt-repository -y ppa:duh/golang
RUN apt-get update
RUN apt-get install -y golang
ADD . /opt/etcd
RUN cd /opt/etcd && ./build
EXPOSE 4001 7001
ENTRYPOINT ["/opt/etcd/etcd", "-c", "0.0.0.0:4001", "-s", "0.0.0.0:7001"]

View File

@ -0,0 +1,61 @@
# Versioning
Goal: We want to be able to upgrade an individual machine in an etcd cluster to a newer version of etcd.
The process will take the form of individual followers upgrading to the latest version until the entire cluster is on the new version.
Immediate need: etcd is moving too fast to version the internal API right now.
But, we need to keep mixed version clusters from being started by a rollowing upgrade process (e.g. the CoreOS developer alpha).
Longer term need: Having a mixed version cluster where all machines are not be running the exact same version of etcd itself but are able to speak one version of the internal protocol.
Solution: The internal protocol needs to be versioned just as the client protocol is.
Initially during the 0.\*.\* series of etcd releases we won't allow mixed versions at all.
## Join Control
We will add a version field to the join command.
But, who decides whether a newly upgraded follower should be able to join a cluster?
### Leader Controlled
If the leader controls the version of followers joining the cluster then it compares its version to the version number presented by the follower in the JoinCommand and rejects the join if the number is less than the leader's version number.
Advantages
- Leader controls all cluster decisions still
Disadvantages
- Follower knows better what versions of the interal protocol it can talk than the leader
### Follower Controlled
A newly upgraded follower should be able to figure out the leaders internal version from a defined internal backwards compatible API endpoint and figure out if it can join the cluster.
If it cannot join the cluster then it simply exits.
Advantages
- The follower is running newer code and knows better if it can talk older protocols
Disadvantages
- This cluster decision isn't made by the leader
## Recommendation
To solve the immediate need and to plan for the future lets do the following:
- Add Version field to JoinCommand
- Have a joining follower read the Version field of the leader and if its own version doesn't match the leader then sleep for some random interval and retry later to see if the leader has upgraded.
# Research
## Zookeeper versioning
Zookeeper very recently added versioning into the protocol and it doesn't seem to have seen any use yet.
https://issues.apache.org/jira/browse/ZOOKEEPER-1633
## doozerd
doozerd stores the version number of the machine in the datastore for other clients to check, no decisions are made off of this number currently.

View File

@ -9,12 +9,12 @@ A highly-available key value store for shared configuration and service discover
* Fast: benchmarked 1000s of writes/s per instance
* Reliable: Properly distributed using Raft
Etcd is written in Go and uses the [raft][raft] consensus algorithm to manage a highly availably replicated log.
Etcd is written in Go and uses the [raft][raft] consensus algorithm to manage a highly-available replicated log.
See [go-etcd][go-etcd] for a native Go client. Or feel free to just use curl, as in the examples below.
See [etcdctl][etcdctl] for a simple command line client. Or feel free to just use curl, as in the examples below.
[raft]: https://github.com/coreos/go-raft
[go-etcd]: https://github.com/coreos/go-etcd
[etcdctl]: http://coreos.com/docs/etcdctl/
## Getting Started
@ -24,22 +24,34 @@ The latest release is available as a binary at [Github][github-release].
[github-release]: https://github.com/coreos/etcd/releases/
You can also buildi etcd from source:
### Building
You can build etcd from source:
```sh
git clone https://github.com/coreos/etcd
./build
```
This will generate a binary in the base directory called `./etcd`.
_NOTE_: you need go 1.1+. Please check your installation with
```
go version
```
### Running a single node
These examples will use a single node cluster to show you the basics of the etcd REST API. Lets start etcd:
```sh
./etcd -d node0
./etcd -d node0 -n node0
```
This will bring up an etcd node listening on port 4001 for client communication and on port 7001 for server-to-server communication. The `-d node0` argument tells etcd to write node configuration, logs and snapshots to the `./node0/` directory.
This will bring up an etcd node listening on port 4001 for client communication and on port 7001 for server-to-server communication.
The `-d node0` argument tells etcd to write node configuration, logs and snapshots to the `./node0/` directory.
The `-n node0` tells the rest of the cluster that this node is named node0.
## Usage
@ -243,10 +255,7 @@ which meas `foo=barbar` is a key-value pair under `/foo` and `foo_dir` is a dire
Etcd supports SSL/TLS and client cert authentication for clients to server, as well as server to server communication
First, you need to have a CA cert `clientCA.crt` and signed key pair `client.crt`, `client.key`. This site has a good reference for how to generate self-signed key pairs:
```url
http://www.g-loaded.eu/2005/11/10/be-your-own-ca/
```
For testing you can use the certificates in the `fixtures/ca` directory.
@ -334,14 +343,14 @@ Let start by creating 3 new etcd instances.
We use -s to specify server port and -c to specify client port and -d to specify the directory to store the log and info of the node in the cluster
```sh
./etcd -s 7001 -c 4001 -d nodes/node1
./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d nodes/node1 -n node1
```
Let the join two more nodes to this cluster using the -C argument:
```sh
./etcd -c 4002 -s 7002 -C 127.0.0.1:7001 -d nodes/node2
./etcd -c 4003 -s 7003 -C 127.0.0.1:7001 -d nodes/node3
./etcd -c 127.0.0.1:4002 -s 127.0.0.1:7002 -C 127.0.0.1:7001 -d nodes/node2 -n node2
./etcd -c 127.0.0.1:4003 -s 127.0.0.1:7003 -C 127.0.0.1:7001 -d nodes/node3 -n node3
```
Get the machines in the cluster:
@ -445,6 +454,10 @@ If you are using SSL for server to server communication, you must use it on all
- [go-etcd](https://github.com/coreos/go-etcd)
**Node libraries**
- [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd)
**Ruby libraries**
- [iconara/etcd-rb](https://github.com/iconara/etcd-rb)
@ -455,6 +468,14 @@ If you are using SSL for server to server communication, you must use it on all
- [spheromak/etcd-cookbook](https://github.com/spheromak/etcd-cookbook)
**Projects using etcd**
- [calavera/active-proxy](https://github.com/calavera/active-proxy) - HTTP Proxy configured with etcd
- [gleicon/goreman](https://github.com/gleicon/goreman/tree/etcd) - Branch of the Go Foreman clone with etcd support
- [garethr/hiera-etcd](https://github.com/garethr/hiera-etcd) - Puppet hiera backend using etcd
- [mattn/etcd-vim](https://github.com/mattn/etcd-vim) - SET and GET keys from inside vim
- [mattn/etcdenv](https://github.com/mattn/etcdenv) - "env" shebang with etcd integration
## Project Details
### Versioning

View File

@ -3,12 +3,19 @@ package main
import (
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"path"
"time"
)
const commandPrefix = "etcd:"
func commandName(name string) string {
return commandPrefix + name
}
// A command represents an action to be taken on the replicated state machine.
type Command interface {
CommandName() string
@ -24,7 +31,7 @@ type SetCommand struct {
// The name of the set command in the log
func (c *SetCommand) CommandName() string {
return "etcd:set"
return commandName("set")
}
// Set the key-value pair
@ -42,7 +49,7 @@ type TestAndSetCommand struct {
// The name of the testAndSet command in the log
func (c *TestAndSetCommand) CommandName() string {
return "testAndSet"
return commandName("testAndSet")
}
// Set the key-value pair if the current value of the key equals to the given prevValue
@ -57,7 +64,7 @@ type GetCommand struct {
// The name of the get command in the log
func (c *GetCommand) CommandName() string {
return "etcd:get"
return commandName("get")
}
// Get the value of key
@ -72,7 +79,7 @@ type DeleteCommand struct {
// The name of the delete command in the log
func (c *DeleteCommand) CommandName() string {
return "etcd:delete"
return commandName("delete")
}
// Delete the key
@ -88,7 +95,7 @@ type WatchCommand struct {
// The name of the watch command in the log
func (c *WatchCommand) CommandName() string {
return "etcd:watch"
return commandName("watch")
}
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
@ -110,14 +117,24 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
// JoinCommand
type JoinCommand struct {
RaftVersion string `json:"raftVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}
func newJoinCommand() *JoinCommand {
return &JoinCommand{
RaftVersion: r.version,
Name: r.name,
RaftURL: r.url,
EtcdURL: e.url,
}
}
// The name of the join command in the log
func (c *JoinCommand) CommandName() string {
return "etcd:join"
return commandName("join")
}
// Join a server to the cluster
@ -133,17 +150,18 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
// check machine number in the cluster
num := machineNum()
if num == maxClusterSize {
return []byte("join fail"), fmt.Errorf(errors[103])
debug("Reject join request from ", c.Name)
return []byte("join fail"), etcdErr.NewError(103, "")
}
addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
// add peer in raft
err := raftServer.AddPeer(c.Name)
err := raftServer.AddPeer(c.Name, "")
// add machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
return []byte("join success"), err

140
config.go Normal file
View File

@ -0,0 +1,140 @@
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"io/ioutil"
"os"
"path/filepath"
)
//--------------------------------------
// Config
//--------------------------------------
// Get the server info from previous conf file
// or from the user
func getInfo(path string) *Info {
infoPath := filepath.Join(path, "info")
if force {
// Delete the old configuration if exist
logPath := filepath.Join(path, "log")
confPath := filepath.Join(path, "conf")
snapshotPath := filepath.Join(path, "snapshot")
os.Remove(infoPath)
os.Remove(logPath)
os.Remove(confPath)
os.RemoveAll(snapshotPath)
} else if info := readInfo(infoPath); info != nil {
infof("Found node configuration in '%s'. Ignoring flags", infoPath)
return info
}
// Read info from command line
info := &argInfo
// Write to file.
content, _ := json.MarshalIndent(info, "", " ")
content = []byte(string(content) + "\n")
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
fatalf("Unable to write info to file: %v", err)
}
infof("Wrote node configuration to '%s'", infoPath)
return info
}
// readInfo reads from info file and decode to Info struct
func readInfo(path string) *Info {
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
fatal(err)
}
defer file.Close()
info := &Info{}
content, err := ioutil.ReadAll(file)
if err != nil {
fatalf("Unable to read info: %v", err)
return nil
}
if err = json.Unmarshal(content, &info); err != nil {
fatalf("Unable to parse info: %v", err)
return nil
}
return info
}
func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
var keyFile, certFile, CAFile string
var tlsCert tls.Certificate
var err error
t.Scheme = "http"
keyFile = info.KeyFile
certFile = info.CertFile
CAFile = info.CAFile
// If the user do not specify key file, cert file and
// CA file, the type will be HTTP
if keyFile == "" && certFile == "" && CAFile == "" {
return t, true
}
// both the key and cert must be present
if keyFile == "" || certFile == "" {
return t, false
}
tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
fatal(err)
}
t.Scheme = "https"
t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
// The client should trust the RootCA that the Server uses since
// everyone is a peer in the network.
t.Client.Certificates = []tls.Certificate{tlsCert}
t.Client.RootCAs = t.Server.ClientCAs
return t, true
}
// newCertPool creates x509 certPool and corresponding Auth Type.
// If the given CAfile is valid, add the cert into the pool and verify the clients'
// certs against the cert in the pool.
// If the given CAfile is empty, do not verify the clients' cert.
// If the given CAfile is not valid, fatal.
func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
if CAFile == "" {
return tls.NoClientCert, nil
}
pemByte, err := ioutil.ReadFile(CAFile)
check(err)
block, pemByte := pem.Decode(pemByte)
cert, err := x509.ParseCertificate(block.Bytes)
check(err)
certPool := x509.NewCertPool()
certPool.AddCert(cert)
return tls.RequireAndVerifyClientCert, certPool
}

View File

@ -1,11 +1,14 @@
package main
package error
import (
"encoding/json"
"net/http"
)
var errors map[int]string
const ()
func init() {
errors = make(map[int]string)
@ -33,17 +36,39 @@ func init() {
}
type jsonError struct {
type Error struct {
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
Cause string `json:"cause,omitempty"`
}
func newJsonError(errorCode int, cause string) []byte {
b, _ := json.Marshal(jsonError{
func NewError(errorCode int, cause string) Error {
return Error{
ErrorCode: errorCode,
Message: errors[errorCode],
Cause: cause,
})
return b
}
}
func Message(code int) string {
return errors[code]
}
// Only for error interface
func (e Error) Error() string {
return e.Message
}
func (e Error) toJsonString() string {
b, _ := json.Marshal(e)
return string(b)
}
func (e Error) Write(w http.ResponseWriter) {
// 3xx is reft internal error
if e.ErrorCode/100 == 3 {
http.Error(w, e.toJsonString(), http.StatusInternalServerError)
} else {
http.Error(w, e.toJsonString(), http.StatusBadRequest)
}
}

511
etcd.go
View File

@ -1,23 +1,12 @@
package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"flag"
"fmt"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/web"
"github.com/coreos/go-raft"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime/pprof"
"strings"
"time"
)
@ -28,28 +17,30 @@ import (
//
//------------------------------------------------------------------------------
var verbose bool
var veryVerbose bool
var (
verbose bool
veryVerbose bool
var machines string
var machinesFile string
machines string
machinesFile string
var cluster []string
cluster []string
var argInfo Info
var dirPath string
argInfo Info
dirPath string
var force bool
force bool
var maxSize int
maxSize int
var snapshot bool
snapshot bool
var retryTimes int
retryTimes int
var maxClusterSize int
maxClusterSize int
var cpuprofile string
cpuprofile string
)
func init() {
flag.BoolVar(&verbose, "v", false, "verbose logging")
@ -87,14 +78,14 @@ func init() {
}
const (
ELECTIONTIMEOUT = 200 * time.Millisecond
HEARTBEATTIMEOUT = 50 * time.Millisecond
ElectionTimeout = 200 * time.Millisecond
HeartbeatTimeout = 50 * time.Millisecond
// Timeout for internal raft http connection
// The original timeout for http is 45 seconds
// which is too long for our usage.
HTTPTIMEOUT = 10 * time.Second
RETRYINTERVAL = 10
HTTPTimeout = 10 * time.Second
RetryInterval = 10
)
//------------------------------------------------------------------------------
@ -120,16 +111,19 @@ type Info struct {
EtcdTLS TLSInfo `json:"etcdTLS"`
}
type TLSConfig struct {
Scheme string
Server tls.Config
Client tls.Config
}
//------------------------------------------------------------------------------
//
// Variables
//
//------------------------------------------------------------------------------
var raftServer *raft.Server
var raftTransporter transporter
var etcdStore *store.Store
var info *Info
//------------------------------------------------------------------------------
//
@ -137,30 +131,6 @@ var info *Info
//
//------------------------------------------------------------------------------
// sanitizeURL will cleanup a host string in the format hostname:port and
// attach a schema.
func sanitizeURL(host string, defaultScheme string) string {
// Blank URLs are fine input, just return it
if len(host) == 0 {
return host
}
p, err := url.Parse(host)
if err != nil {
fatal(err)
}
// Make sure the host is in Host:Port format
_, _, err = net.SplitHostPort(host)
if err != nil {
fatal(err)
}
p = &url.URL{Host: host, Scheme: defaultScheme}
return p.String()
}
//--------------------------------------
// Main
//--------------------------------------
@ -169,23 +139,7 @@ func main() {
flag.Parse()
if cpuprofile != "" {
f, err := os.Create(cpuprofile)
if err != nil {
fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
fmt.Printf("captured %v, stopping profiler and exiting..", sig)
pprof.StopCPUProfile()
os.Exit(1)
}
}()
runCPUProfile()
}
if veryVerbose {
@ -203,6 +157,7 @@ func main() {
cluster = strings.Split(string(b), ",")
}
// Check TLS arguments
raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
if !ok {
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
@ -218,422 +173,28 @@ func main() {
fatal("ERROR: server name required. e.g. '-n=server_name'")
}
// Check host name arguments
argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
// Setup commands.
registerCommands()
// Read server info from file or grab it from user.
if err := os.MkdirAll(dirPath, 0744); err != nil {
fatalf("Unable to create path: %s", err)
}
info = getInfo(dirPath)
info := getInfo(dirPath)
// Create etcd key-value store
etcdStore = store.CreateStore(maxSize)
snapConf = newSnapshotConf()
startRaft(raftTLSConfig)
// Create etcd and raft server
e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS)
r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS)
if argInfo.WebURL != "" {
// start web
argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
go webHelper()
go web.Start(raftServer, argInfo.WebURL)
}
startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
startWebInterface()
r.ListenAndServe()
e.ListenAndServe()
}
// Start the raft server
func startRaft(tlsConfig TLSConfig) {
var err error
raftName := info.Name
// Create transporter for raft
raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
// Create raft server
raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
if err != nil {
fatal(err)
}
// LoadSnapshot
if snapshot {
err = raftServer.LoadSnapshot()
if err == nil {
debugf("%s finished load snapshot", raftServer.Name())
} else {
debug(err)
}
}
raftServer.SetElectionTimeout(ELECTIONTIMEOUT)
raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
raftServer.Start()
if raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
time.Sleep(time.Millisecond * 20)
// leader need to join self as a peer
for {
command := &JoinCommand{
Name: raftServer.Name(),
RaftURL: argInfo.RaftURL,
EtcdURL: argInfo.EtcdURL,
}
_, err := raftServer.Do(command)
if err == nil {
break
}
}
debugf("%s start as a leader", raftServer.Name())
// start as a follower in a existing cluster
} else {
time.Sleep(time.Millisecond * 20)
for i := 0; i < retryTimes; i++ {
success := false
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err = joinCluster(raftServer, machine)
if err != nil {
if err.Error() == errors[103] {
fmt.Println(err)
os.Exit(1)
}
debugf("cannot join to cluster via machine %s %s", machine, err)
} else {
success = true
break
}
}
if success {
break
}
warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
time.Sleep(time.Second * RETRYINTERVAL)
}
if err != nil {
fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
}
debugf("%s success join to the cluster", raftServer.Name())
}
} else {
// rejoin the previous cluster
debugf("%s restart as a follower", raftServer.Name())
}
// open the snapshot
if snapshot {
go raftServer.Snapshot()
}
// start to response to raft requests
go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config) transporter {
t := transporter{}
t.scheme = scheme
tr := &http.Transport{
Dial: dialTimeout,
}
if scheme == "https" {
tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true
}
t.client = &http.Client{Transport: tr}
return t
}
// Dial with timeout
func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, HTTPTIMEOUT)
}
// Start to listen and response raft command
func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
u, _ := url.Parse(info.RaftURL)
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
raftMux := http.NewServeMux()
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: u.Host,
}
// internal commands
raftMux.HandleFunc("/name", NameHttpHandler)
raftMux.HandleFunc("/join", JoinHttpHandler)
raftMux.HandleFunc("/vote", VoteHttpHandler)
raftMux.HandleFunc("/log", GetLogHttpHandler)
raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
}
}
// Start to listen and response client command
func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
u, _ := url.Parse(info.EtcdURL)
fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
etcdMux := http.NewServeMux()
server := &http.Server{
Handler: etcdMux,
TLSConfig: &tlsConf,
Addr: u.Host,
}
// external commands
etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
etcdMux.HandleFunc("/leader", LeaderHttpHandler)
etcdMux.HandleFunc("/machines", MachinesHttpHandler)
etcdMux.HandleFunc("/", VersionHttpHandler)
etcdMux.HandleFunc("/stats", StatsHttpHandler)
etcdMux.HandleFunc("/test/", TestHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
}
}
//--------------------------------------
// Config
//--------------------------------------
type TLSConfig struct {
Scheme string
Server tls.Config
Client tls.Config
}
func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
var keyFile, certFile, CAFile string
var tlsCert tls.Certificate
var err error
t.Scheme = "http"
keyFile = info.KeyFile
certFile = info.CertFile
CAFile = info.CAFile
// If the user do not specify key file, cert file and
// CA file, the type will be HTTP
if keyFile == "" && certFile == "" && CAFile == "" {
return t, true
}
// both the key and cert must be present
if keyFile == "" || certFile == "" {
return t, false
}
tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
fatal(err)
}
t.Scheme = "https"
t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
// The client should trust the RootCA that the Server uses since
// everyone is a peer in the network.
t.Client.Certificates = []tls.Certificate{tlsCert}
t.Client.RootCAs = t.Server.ClientCAs
return t, true
}
func parseInfo(path string) *Info {
file, err := os.Open(path)
if err != nil {
return nil
}
info := &Info{}
defer file.Close()
content, err := ioutil.ReadAll(file)
if err != nil {
fatalf("Unable to read info: %v", err)
return nil
}
if err = json.Unmarshal(content, &info); err != nil {
fatalf("Unable to parse info: %v", err)
return nil
}
return info
}
// Get the server info from previous conf file
// or from the user
func getInfo(path string) *Info {
// Read in the server info if available.
infoPath := fmt.Sprintf("%s/info", path)
// Delete the old configuration if exist
if force {
logPath := fmt.Sprintf("%s/log", path)
confPath := fmt.Sprintf("%s/conf", path)
snapshotPath := fmt.Sprintf("%s/snapshot", path)
os.Remove(infoPath)
os.Remove(logPath)
os.Remove(confPath)
os.RemoveAll(snapshotPath)
}
info := parseInfo(infoPath)
if info != nil {
fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
return info
}
info = &argInfo
// Write to file.
content, _ := json.MarshalIndent(info, "", " ")
content = []byte(string(content) + "\n")
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
fatalf("Unable to write info to file: %v", err)
}
fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
return info
}
// Create client auth certpool
func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
if CAFile == "" {
return tls.NoClientCert, nil
}
pemByte, _ := ioutil.ReadFile(CAFile)
block, pemByte := pem.Decode(pemByte)
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
fatal(err)
}
certPool := x509.NewCertPool()
certPool.AddCert(cert)
return tls.RequireAndVerifyClientCert, certPool
}
// Send join requests to the leader.
func joinCluster(s *raft.Server, raftURL string) error {
var b bytes.Buffer
command := &JoinCommand{
Name: s.Name(),
RaftURL: info.RaftURL,
EtcdURL: info.EtcdURL,
}
json.NewEncoder(&b).Encode(command)
// t must be ok
t, ok := raftServer.Transporter().(transporter)
if !ok {
panic("wrong type")
}
joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
debugf("Send Join Request to %s", raftURL)
resp, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
resp, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
return fmt.Errorf(errors[103])
} else {
return fmt.Errorf("Unable to join")
}
}
}
return fmt.Errorf("Unable to join: %v", err)
}
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&TestAndSetCommand{})
}

View File

@ -2,29 +2,57 @@ package main
import (
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"net/http"
"strconv"
"time"
"strings"
)
//-------------------------------------------------------------------
// Handlers to handle etcd-store related request via etcd url
//-------------------------------------------------------------------
func NewEtcdMuxer() *http.ServeMux {
// external commands
etcdMux := http.NewServeMux()
etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
etcdMux.HandleFunc("/test/", TestHttpHandler)
return etcdMux
}
type errorHandler func(http.ResponseWriter, *http.Request) error
func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if e := fn(w, r); e != nil {
if etcdErr, ok := e.(etcdErr.Error); ok {
debug("Return error: ", etcdErr.Error())
etcdErr.Write(w)
} else {
http.Error(w, e.Error(), http.StatusInternalServerError)
}
}
}
// Multiplex GET/POST/DELETE request to corresponding handlers
func Multiplexer(w http.ResponseWriter, req *http.Request) {
func Multiplexer(w http.ResponseWriter, req *http.Request) error {
switch req.Method {
case "GET":
GetHttpHandler(&w, req)
return GetHttpHandler(w, req)
case "POST":
SetHttpHandler(&w, req)
return SetHttpHandler(w, req)
case "DELETE":
DeleteHttpHandler(&w, req)
return DeleteHttpHandler(w, req)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
return nil
}
}
@ -34,26 +62,19 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) {
//--------------------------------------
// Set Command Handler
func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v1/keys/"):]
if store.CheckKeyword(key) {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(400, "Set"))
return
return etcdErr.NewError(400, "Set")
}
debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
value := req.FormValue("value")
if len(value) == 0 {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(200, "Set"))
return
return etcdErr.NewError(200, "Set")
}
prevValue := req.FormValue("prevValue")
@ -63,11 +84,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
expireTime, err := durationToExpireTime(strDuration)
if err != nil {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(202, "Set"))
return
return etcdErr.NewError(202, "Set")
}
if len(prevValue) != 0 {
@ -78,7 +95,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
ExpireTime: expireTime,
}
dispatch(command, w, req, true)
return dispatch(command, w, req, true)
} else {
command := &SetCommand{
@ -87,105 +104,66 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
ExpireTime: expireTime,
}
dispatch(command, w, req, true)
return dispatch(command, w, req, true)
}
}
// Delete Handler
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v1/keys/"):]
debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
command := &DeleteCommand{
Key: key,
}
dispatch(command, w, req, true)
return dispatch(command, w, req, true)
}
// Dispatch the command to leader
func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
if raftServer.State() == "leader" {
if body, err := raftServer.Do(c); err != nil {
if _, ok := err.(store.NotFoundError); ok {
(*w).WriteHeader(http.StatusNotFound)
(*w).Write(newJsonError(100, err.Error()))
return
}
func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
if _, ok := err.(store.TestFail); ok {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(101, err.Error()))
return
}
if _, ok := err.(store.NotFile); ok {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(102, err.Error()))
return
}
if err.Error() == errors[103] {
(*w).WriteHeader(http.StatusBadRequest)
(*w).Write(newJsonError(103, ""))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, err.Error()))
return
if r.State() == raft.Leader {
if body, err := r.Do(c); err != nil {
return err
} else {
if body == nil {
(*w).WriteHeader(http.StatusNotFound)
(*w).Write(newJsonError(300, "Empty result from raft"))
return etcdErr.NewError(300, "Empty result from raft")
} else {
body, ok := body.([]byte)
// this should not happen
if !ok {
panic("wrong type")
}
(*w).WriteHeader(http.StatusOK)
(*w).Write(body)
body, _ := body.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(body)
return nil
}
return
}
} else {
leader := r.Leader()
// current no leader
if raftServer.Leader() == "" {
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
return
if leader == "" {
return etcdErr.NewError(300, "")
}
// tell the client where is the leader
path := req.URL.Path
var scheme string
if scheme = req.URL.Scheme; scheme == "" {
scheme = "http://"
}
var url string
if etcd {
etcdAddr, _ := nameToEtcdURL(raftServer.Leader())
etcdAddr, _ := nameToEtcdURL(leader)
url = etcdAddr + path
} else {
raftAddr, _ := nameToRaftURL(raftServer.Leader())
raftAddr, _ := nameToRaftURL(leader)
url = raftAddr + path
}
debugf("Redirect to %s", url)
http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
return
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
return nil
}
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
return
return etcdErr.NewError(300, "")
}
//--------------------------------------
@ -196,92 +174,66 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
//--------------------------------------
// Handler to return the current leader's raft address
func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
leader := raftServer.Leader()
func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
leader := r.Leader()
if leader != "" {
w.WriteHeader(http.StatusOK)
raftURL, _ := nameToRaftURL(leader)
w.Write([]byte(raftURL))
return nil
} else {
// not likely, but it may happen
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(301, ""))
return etcdErr.NewError(301, "")
}
}
// Handler to return all the known machines in the current cluster
func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
peers := raftServer.Peers()
// Add itself to the machine list first
// Since peer map does not contain the server itself
machines, _ := getEtcdURL(raftServer.Name())
// Add all peers to the list and separate by comma
// We do not use json here since we accept machines list
// in the command line separate by comma.
for peerName, _ := range peers {
if addr, ok := getEtcdURL(peerName); ok {
machines = machines + "," + addr
}
}
func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
machines := getMachines()
w.WriteHeader(http.StatusOK)
w.Write([]byte(machines))
w.Write([]byte(strings.Join(machines, ", ")))
return nil
}
// Handler to return the current version of etcd
func VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("etcd %s", releaseVersion)))
fmt.Fprintf(w, "etcd %s", releaseVersion)
return nil
}
// Handler to return the basic stats of etcd
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
w.Write(etcdStore.Stats())
return nil
}
// Get Handler
func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v1/keys/"):]
debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
command := &GetCommand{
Key: key,
}
if body, err := command.Apply(raftServer); err != nil {
if _, ok := err.(store.NotFoundError); ok {
(*w).WriteHeader(http.StatusNotFound)
(*w).Write(newJsonError(100, err.Error()))
return
}
(*w).WriteHeader(http.StatusInternalServerError)
(*w).Write(newJsonError(300, ""))
if body, err := command.Apply(r.Server); err != nil {
return err
} else {
body, ok := body.([]byte)
if !ok {
panic("wrong type")
}
(*w).WriteHeader(http.StatusOK)
(*w).Write(body)
body, _ := body.([]byte)
w.WriteHeader(http.StatusOK)
w.Write(body)
return nil
}
}
// Watch handler
func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v1/watch/"):]
command := &WatchCommand{
@ -289,39 +241,34 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
}
if req.Method == "GET" {
debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
command.SinceIndex = 0
} else if req.Method == "POST" {
// watch from a specific index
debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
content := req.FormValue("index")
sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write(newJsonError(203, "Watch From Index"))
return etcdErr.NewError(203, "Watch From Index")
}
command.SinceIndex = sinceIndex
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
return
return nil
}
if body, err := command.Apply(raftServer); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(500, key))
if body, err := command.Apply(r.Server); err != nil {
return etcdErr.NewError(500, key)
} else {
w.WriteHeader(http.StatusOK)
body, ok := body.([]byte)
if !ok {
panic("wrong type")
}
body, _ := body.([]byte)
w.Write(body)
return nil
}
}
@ -339,17 +286,3 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusBadRequest)
}
// Convert string duration to time format
func durationToExpireTime(strDuration string) (time.Time, error) {
if strDuration != "" {
duration, err := strconv.Atoi(strDuration)
if err != nil {
return time.Unix(0, 0), err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return time.Unix(0, 0), nil
}
}

View File

@ -1,150 +0,0 @@
package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"
)
// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
// It will print out the election time and the average election time.
func TestKillLeader(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer destroyCluster(etcds)
leaderChan := make(chan string, 1)
time.Sleep(time.Second)
go leaderMonitor(clusterSize, 1, leaderChan)
var totalTime time.Duration
leader := "http://127.0.0.1:7001"
for i := 0; i < clusterSize; i++ {
fmt.Println("leader is ", leader)
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
num := port - 7001
fmt.Println("kill server ", num)
etcds[num].Kill()
etcds[num].Release()
start := time.Now()
for {
newLeader := <-leaderChan
if newLeader != leader {
leader = newLeader
break
}
}
take := time.Now().Sub(start)
totalTime += take
avgTime := totalTime / (time.Duration)(i+1)
fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMEOUT)
fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMEOUT)
etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
}
}
// TestKillRandom kills random machines in the cluster and
// restart them after all other machines agree on the same leader
func TestKillRandom(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 9
argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer destroyCluster(etcds)
leaderChan := make(chan string, 1)
time.Sleep(3 * time.Second)
go leaderMonitor(clusterSize, 4, leaderChan)
toKill := make(map[int]bool)
for i := 0; i < 20; i++ {
fmt.Printf("TestKillRandom Round[%d/20]\n", i)
j := 0
for {
r := rand.Int31n(9)
if _, ok := toKill[int(r)]; !ok {
j++
toKill[int(r)] = true
}
if j > 3 {
break
}
}
for num, _ := range toKill {
etcds[num].Kill()
etcds[num].Release()
}
<-leaderChan
for num, _ := range toKill {
etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
}
toKill = make(map[int]bool)
}
<-leaderChan
}
func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 3
_, etcds, _ := createCluster(clusterSize, procAttr, tls)
defer destroyCluster(etcds)
time.Sleep(time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
resp.Body.Close()
}
}
func BenchmarkEtcdDirectCall(b *testing.B) {
templateBenchmarkEtcdDirectCall(b, false)
}
func BenchmarkEtcdDirectCallTls(b *testing.B) {
templateBenchmarkEtcdDirectCall(b, true)
}

48
etcd_server.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
"net/http"
"net/url"
)
type etcdServer struct {
http.Server
name string
url string
tlsConf *TLSConfig
tlsInfo *TLSInfo
}
var e *etcdServer
func newEtcdServer(name string, urlStr string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
u, err := url.Parse(urlStr)
if err != nil {
fatalf("invalid url '%s': %s", e.url, err)
}
return &etcdServer{
Server: http.Server{
Handler: NewEtcdMuxer(),
TLSConfig: &tlsConf.Server,
Addr: u.Host,
},
name: name,
url: urlStr,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
}
}
// Start to listen and response etcd client command
func (e *etcdServer) ListenAndServe() {
infof("etcd server [%s:%s]", e.name, e.url)
if e.tlsConf.Scheme == "http" {
fatal(e.Server.ListenAndServe())
} else {
fatal(e.Server.ListenAndServeTLS(e.tlsInfo.CertFile, e.tlsInfo.KeyFile))
}
}

View File

@ -2,10 +2,15 @@ package main
import (
"fmt"
"github.com/coreos/etcd/test"
"github.com/coreos/go-etcd/etcd"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
//"strconv"
"strconv"
"strings"
"testing"
"time"
)
@ -31,7 +36,7 @@ func TestSingleNode(t *testing.T) {
// Test Set
result, err := c.Set("foo", "bar", 100)
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
if err != nil {
t.Fatal(err)
}
@ -51,6 +56,53 @@ func TestSingleNode(t *testing.T) {
}
}
// TestInternalVersionFail will ensure that etcd does not come up if the internal raft
// versions do not match.
func TestInternalVersionFail(t *testing.T) {
checkedVersion := false
testMux := http.NewServeMux()
testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "This is not a version number")
checkedVersion = true
})
testMux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
t.Fatal("should not attempt to join!")
})
ts := httptest.NewServer(testMux)
defer ts.Close()
fakeURL, _ := url.Parse(ts.URL)
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-vv", "-C="+fakeURL.Host}
process, err := os.StartProcess("etcd", args, procAttr)
if err != nil {
t.Fatal("start process failed:" + err.Error())
return
}
defer process.Kill()
time.Sleep(time.Second)
_, err = http.Get("http://127.0.0.1:4001")
if err == nil {
t.Fatal("etcd node should not be up")
return
}
if checkedVersion == false {
t.Fatal("etcd did not check the version")
return
}
}
// This test creates a single node and then set a value to it.
// Then this test kills the node and restart it and tries to get the value again.
func TestSingleNodeRecovery(t *testing.T) {
@ -72,7 +124,7 @@ func TestSingleNodeRecovery(t *testing.T) {
// Test Set
result, err := c.Set("foo", "bar", 100)
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
if err != nil {
t.Fatal(err)
}
@ -116,13 +168,13 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
clusterSize := 3
_, etcds, err := createCluster(clusterSize, procAttr, tls)
_, etcds, err := test.CreateCluster(clusterSize, procAttr, tls)
if err != nil {
t.Fatal("cannot create cluster")
}
defer destroyCluster(etcds)
defer test.DestroyCluster(etcds)
time.Sleep(time.Second)
@ -133,7 +185,7 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
// Test Set
result, err := c.Set("foo", "bar", 100)
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
if err != nil {
t.Fatal(err)
}
@ -169,13 +221,13 @@ func TestMultiNodeRecovery(t *testing.T) {
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer destroyCluster(etcds)
defer test.DestroyCluster(etcds)
time.Sleep(2 * time.Second)
@ -185,7 +237,7 @@ func TestMultiNodeRecovery(t *testing.T) {
stop := make(chan bool)
// Test Set
go set(stop)
go test.Set(stop)
for i := 0; i < 10; i++ {
num := rand.Int() % clusterSize
@ -207,3 +259,151 @@ func TestMultiNodeRecovery(t *testing.T) {
stop <- true
<-stop
}
// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
// It will print out the election time and the average election time.
func TestKillLeader(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer test.DestroyCluster(etcds)
stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)
time.Sleep(time.Second)
go test.Monitor(clusterSize, 1, leaderChan, all, stop)
var totalTime time.Duration
leader := "http://127.0.0.1:7001"
for i := 0; i < clusterSize; i++ {
fmt.Println("leader is ", leader)
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
num := port - 7001
fmt.Println("kill server ", num)
etcds[num].Kill()
etcds[num].Release()
start := time.Now()
for {
newLeader := <-leaderChan
if newLeader != leader {
leader = newLeader
break
}
}
take := time.Now().Sub(start)
totalTime += take
avgTime := totalTime / (time.Duration)(i+1)
fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout)
fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
}
stop <- true
}
// TestKillRandom kills random machines in the cluster and
// restart them after all other machines agree on the same leader
func TestKillRandom(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 9
argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer test.DestroyCluster(etcds)
stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)
time.Sleep(3 * time.Second)
go test.Monitor(clusterSize, 4, leaderChan, all, stop)
toKill := make(map[int]bool)
for i := 0; i < 20; i++ {
fmt.Printf("TestKillRandom Round[%d/20]\n", i)
j := 0
for {
r := rand.Int31n(9)
if _, ok := toKill[int(r)]; !ok {
j++
toKill[int(r)] = true
}
if j > 3 {
break
}
}
for num, _ := range toKill {
err := etcds[num].Kill()
if err != nil {
panic(err)
}
etcds[num].Wait()
}
time.Sleep(ElectionTimeout)
<-leaderChan
for num, _ := range toKill {
etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
}
toKill = make(map[int]bool)
<-all
}
stop <- true
}
func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 3
_, etcds, _ := test.CreateCluster(clusterSize, procAttr, tls)
defer test.DestroyCluster(etcds)
time.Sleep(time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
resp.Body.Close()
}
}
func BenchmarkEtcdDirectCall(b *testing.B) {
templateBenchmarkEtcdDirectCall(b, false)
}
func BenchmarkEtcdDirectCallTls(b *testing.B) {
templateBenchmarkEtcdDirectCall(b, true)
}

3
go_version.go Normal file
View File

@ -0,0 +1,3 @@
// +build !go1.1
"etcd requires go 1.1 or greater to build"

View File

@ -1,27 +1,42 @@
package main
import (
"net/url"
"path"
)
func getEtcdURL(name string) (string, bool) {
resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
m, err := url.ParseQuery(resps[0].Value)
if err != nil {
panic("Failed to parse machines entry")
}
addr := m["etcd"][0]
return addr, true
}
// machineNum returns the number of machines in the cluster
func machineNum() int {
response, _ := etcdStore.RawGet("_etcd/machines")
return len(response)
}
// getMachines gets the current machines in the cluster
func getMachines() []string {
peers := r.Peers()
machines := make([]string, len(peers)+1)
leader, ok := nameToEtcdURL(r.Leader())
self := e.url
i := 1
if ok {
machines[0] = leader
if leader != self {
machines[1] = self
i = 2
}
} else {
machines[0] = self
}
// Add all peers to the slice
for peerName, _ := range peers {
if machine, ok := nameToEtcdURL(peerName); ok {
// do not add leader twice
if machine != leader {
machines[i] = machine
i++
}
}
}
return machines
}

View File

@ -7,6 +7,7 @@ import (
// we map node name to url
type nodeInfo struct {
raftVersion string
raftURL string
etcdURL string
}
@ -39,8 +40,9 @@ func nameToRaftURL(name string) (string, bool) {
}
// addNameToURL add a name that maps to raftURL and etcdURL
func addNameToURL(name string, raftURL string, etcdURL string) {
func addNameToURL(name string, version string, raftURL string, etcdURL string) {
namesMap[name] = &nodeInfo{
raftVersion: raftVersion,
raftURL: raftURL,
etcdURL: etcdURL,
}

View File

@ -12,10 +12,10 @@ import (
// Get all the current logs
func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name())
debugf("[recv] GET %s/log", r.url)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(raftServer.LogEntries())
json.NewEncoder(w).Encode(r.LogEntries())
}
// Response to vote request
@ -23,8 +23,8 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
debugf("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName)
if resp := raftServer.RequestVote(rvreq); resp != nil {
debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
if resp := r.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@ -40,8 +40,8 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
err := decodeJsonRequest(req, aereq)
if err == nil {
debugf("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries))
if resp := raftServer.AppendEntries(aereq); resp != nil {
debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
if resp := r.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
@ -59,8 +59,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debugf("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name())
if resp := raftServer.RequestSnapshot(aereq); resp != nil {
debugf("[recv] POST %s/snapshot/ ", r.url)
if resp := r.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@ -75,8 +75,8 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debugf("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name())
if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
debugf("[recv] POST %s/snapshotRecovery/ ", r.url)
if resp := r.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@ -88,28 +88,35 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
// Get the port that listening for etcd connecting of the server
func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name())
debugf("[recv] Get %s/etcdURL/ ", r.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(argInfo.EtcdURL))
}
// Response to the join request
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
debugf("Receive Join Request from %s", command.Name)
dispatch(command, &w, req, false)
return dispatch(command, w, req, false)
} else {
w.WriteHeader(http.StatusInternalServerError)
return
return nil
}
}
// Response to the name request
func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name())
debugf("[recv] Get %s/name/ ", r.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(raftServer.Name()))
w.Write([]byte(r.name))
}
// Response to the name request
func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/version/ ", r.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(r.version))
}

252
raft_server.go Normal file
View File

@ -0,0 +1,252 @@
package main
import (
"bytes"
"crypto/tls"
"encoding/json"
"io/ioutil"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/go-raft"
"net/http"
"net/url"
"time"
)
type raftServer struct {
*raft.Server
version string
name string
url string
tlsConf *TLSConfig
tlsInfo *TLSInfo
}
var r *raftServer
func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
// Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
// Create raft server
server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
check(err)
return &raftServer{
Server: server,
version: raftVersion,
name: name,
url: url,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
}
}
// Start the raft server
func (r *raftServer) ListenAndServe() {
// Setup commands.
registerCommands()
// LoadSnapshot
if snapshot {
err := r.LoadSnapshot()
if err == nil {
debugf("%s finished load snapshot", r.name)
} else {
debug(err)
}
}
r.SetElectionTimeout(ElectionTimeout)
r.SetHeartbeatTimeout(HeartbeatTimeout)
r.Start()
if r.IsLogEmpty() {
// start as a leader in a new cluster
if len(cluster) == 0 {
startAsLeader()
} else {
startAsFollower()
}
} else {
// rejoin the previous cluster
debugf("%s restart as a follower", r.name)
}
// open the snapshot
if snapshot {
go monitorSnapshot()
}
// start to response to raft requests
go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server)
}
func startAsLeader() {
// leader need to join self as a peer
for {
_, err := r.Do(newJoinCommand())
if err == nil {
break
}
}
debugf("%s start as a leader", r.name)
}
func startAsFollower() {
// start as a follower in a existing cluster
for i := 0; i < retryTimes; i++ {
for _, machine := range cluster {
if len(machine) == 0 {
continue
}
err := joinCluster(r.Server, machine, r.tlsConf.Scheme)
if err == nil {
debugf("%s success join to the cluster via machine %s", r.name, machine)
return
} else {
if _, ok := err.(etcdErr.Error); ok {
fatal(err)
}
debugf("cannot join to cluster via machine %s %s", machine, err)
}
}
warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval)
}
fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
}
// Start to listen and response raft command
func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
u, _ := url.Parse(r.url)
infof("raft server [%s:%s]", r.name, u)
raftMux := http.NewServeMux()
server := &http.Server{
Handler: raftMux,
TLSConfig: &tlsConf,
Addr: u.Host,
}
// internal commands
raftMux.HandleFunc("/name", NameHttpHandler)
raftMux.HandleFunc("/version", RaftVersionHttpHandler)
raftMux.Handle("/join", errorHandler(JoinHttpHandler))
raftMux.HandleFunc("/vote", VoteHttpHandler)
raftMux.HandleFunc("/log", GetLogHttpHandler)
raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
if scheme == "http" {
fatal(server.ListenAndServe())
} else {
fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile))
}
}
// getVersion fetches the raft version of a peer. This works for now but we
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t transporter, versionURL url.URL) (string, error) {
resp, err := t.Get(versionURL.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return string(body), nil
}
// Send join requests to the leader.
func joinCluster(s *raft.Server, raftURL string, scheme string) error {
var b bytes.Buffer
// t must be ok
t, _ := r.Transporter().(transporter)
// Our version must match the leaders version
versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != r.version {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}
json.NewEncoder(&b).Encode(newJoinCommand())
joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
debugf("Send Join Request to %s", raftURL)
resp, err := t.Post(joinURL.String(), &b)
for {
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}
if resp != nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location")
debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(newJoinCommand())
resp, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{}
decoder.Decode(err)
return *err
} else {
return fmt.Errorf("Unable to join")
}
}
}
return fmt.Errorf("Unable to join: %v", err)
}
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&TestAndSetCommand{})
}

36
snapshot.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"time"
)
// basic conf.
// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// The number of writes when the last snapshot happened
lastWrites uint64
// If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot
writesThr uint64
}
var snapConf *snapshotConf
func newSnapshotConf() *snapshotConf {
// check snapshot every 3 seconds and the threshold is 20K
return &snapshotConf{time.Second * 3, etcdStore.TotalWrites(), 20 * 1000}
}
func monitorSnapshot() {
for {
time.Sleep(snapConf.checkingInterval)
currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
if currentWrites > snapConf.writesThr {
r.TakeSnapshot()
snapConf.lastWrites = etcdStore.TotalWrites()
}
}
}

View File

@ -1,25 +0,0 @@
package store
type NotFoundError string
func (e NotFoundError) Error() string {
return string(e)
}
type NotFile string
func (e NotFile) Error() string {
return string(e)
}
type TestFail string
func (e TestFail) Error() string {
return string(e)
}
type Keyword string
func (e Keyword) Error() string {
return string(e)
}

View File

@ -18,8 +18,16 @@ type EtcdStats struct {
TestAndSets uint64 `json:"testAndSets"`
}
// Stats returns the basic statistics information of etcd storage
// Stats returns the basic statistics information of etcd storage since its recent start
func (s *Store) Stats() []byte {
b, _ := json.Marshal(s.BasicStats)
return b
}
// TotalWrites returns the total write operations
// It helps with snapshot
func (s *Store) TotalWrites() uint64 {
bs := s.BasicStats
return bs.Deletes + bs.Sets + bs.TestAndSets
}

View File

@ -3,6 +3,7 @@ package store
import (
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"path"
"strconv"
"sync"
@ -27,7 +28,7 @@ type Store struct {
// the watching condition.
// It is needed so that clone() can atomically replicate the Store
// and do the log snapshot in a go routine.
mutex sync.Mutex
mutex sync.RWMutex
// WatcherHub is where we register all the clients
// who issue a watch request
@ -239,8 +240,7 @@ func (s *Store) internalSet(key string, value string, expireTime time.Time, inde
ok := s.Tree.set(key, Node{value, expireTime, update})
if !ok {
err := NotFile(key)
return nil, err
return nil, etcdErr.NewError(102, "set: "+key)
}
if isExpire {
@ -304,8 +304,8 @@ func (s *Store) internalGet(key string) *Response {
// If key is a file return the file
// If key is a directory reuturn an array of files
func (s *Store) Get(key string) ([]byte, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.mutex.RLock()
defer s.mutex.RUnlock()
resps, err := s.RawGet(key)
@ -313,7 +313,12 @@ func (s *Store) Get(key string) ([]byte, error) {
return nil, err
}
if len(resps) == 1 {
key = path.Clean("/" + key)
// If the number of resps == 1 and the response key
// is the key we query, a signal key-value should
// be returned
if len(resps) == 1 && resps[0].Key == key {
return json.Marshal(resps[0])
}
@ -388,8 +393,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
return resps, nil
}
err := NotFoundError(key)
return nil, err
return nil, etcdErr.NewError(100, "get: "+key)
}
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
@ -446,8 +450,7 @@ func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
return msg, err
} else {
err := NotFoundError(key)
return nil, err
return nil, etcdErr.NewError(100, "delete: "+key)
}
}
@ -462,8 +465,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim
resp := s.internalGet(key)
if resp == nil {
err := NotFoundError(key)
return nil, err
return nil, etcdErr.NewError(100, "testandset: "+key)
}
if resp.Value == prevValue {
@ -473,8 +475,8 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim
} else {
// If fails, return err
err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue))
return nil, err
return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
resp.Value, prevValue))
}
}

View File

View File

@ -1,4 +1,4 @@
package main
package test
import (
"fmt"
@ -18,7 +18,7 @@ var client = http.Client{
}
// Sending set commands
func set(stop chan bool) {
func Set(stop chan bool) {
stopSet := false
i := 0
@ -50,12 +50,11 @@ func set(stop chan bool) {
i++
}
fmt.Println("set stop")
stop <- true
}
// Create a cluster of etcd nodes
func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
argGroup := make([][]string, size)
sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
@ -70,7 +69,7 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
for i := 0; i < size; i++ {
if i == 0 {
argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"}
argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
if ssl {
argGroup[i] = append(argGroup[i], sslServer1...)
}
@ -97,7 +96,7 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
// have to retry. This retry can take upwards of 15 seconds
// which slows tests way down and some of them fail.
if i == 0 {
time.Sleep(time.Second)
time.Sleep(time.Second * 2)
}
}
@ -105,10 +104,9 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
}
// Destroy all the nodes in the cluster
func destroyCluster(etcds []*os.Process) error {
for i, etcd := range etcds {
func DestroyCluster(etcds []*os.Process) error {
for _, etcd := range etcds {
err := etcd.Kill()
fmt.Println("kill ", i)
if err != nil {
panic(err.Error())
}
@ -118,7 +116,7 @@ func destroyCluster(etcds []*os.Process) error {
}
//
func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
leaderMap := make(map[int]string)
baseAddrFormat := "http://0.0.0.0:400%d"
@ -153,6 +151,8 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
if i == size {
select {
case <-stop:
return
case <-leaderChan:
leaderChan <- knownLeader
default:
@ -160,6 +160,14 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
}
}
if dead == 0 {
select {
case <-all:
all <- true
default:
all <- true
}
}
time.Sleep(time.Millisecond * 10)
}
@ -168,7 +176,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
func getLeader(addr string) (string, error) {
resp, err := client.Get(addr + "/leader")
resp, err := client.Get(addr + "/v1/leader")
if err != nil {
return "", err
@ -191,28 +199,6 @@ func getLeader(addr string) (string, error) {
}
func directSet() {
c := make(chan bool, 1000)
for i := 0; i < 1000; i++ {
go send(c)
}
for i := 0; i < 1000; i++ {
<-c
}
}
func send(c chan bool) {
for i := 0; i < 10; i++ {
command := &SetCommand{}
command.Key = "foo"
command.Value = "bar"
command.ExpireTime = time.Unix(0, 0)
raftServer.Do(command)
}
c <- true
}
// Dial with timeout
func dialTimeoutFast(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, time.Millisecond*10)

View File

@ -0,0 +1,50 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package netutil provides network utility functions, complementing the more
// common ones in the net package.
package netutil
import (
"net"
"sync"
)
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func LimitListener(l net.Listener, n int) net.Listener {
ch := make(chan struct{}, n)
for i := 0; i < n; i++ {
ch <- struct{}{}
}
return &limitListener{l, ch}
}
type limitListener struct {
net.Listener
ch chan struct{}
}
func (l *limitListener) Accept() (net.Conn, error) {
<-l.ch
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return &limitListenerConn{Conn: c, ch: l.ch}, nil
}
type limitListenerConn struct {
net.Conn
ch chan<- struct{}
close sync.Once
}
func (l *limitListenerConn) Close() error {
err := l.Conn.Close()
l.close.Do(func() {
l.ch <- struct{}{}
})
return err
}

View File

@ -0,0 +1,65 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netutil
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestLimitListener(t *testing.T) {
const (
max = 5
num = 200
)
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer l.Close()
l = LimitListener(l, max)
var open int32
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > max {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
}))
var wg sync.WaitGroup
var failed int32
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r, err := http.Get("http://" + l.Addr().String())
if err != nil {
t.Logf("Get: %v", err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the kernel's accept queue is filled,
// but most should succeed.
if failed >= num/2 {
t.Errorf("too many Gets failed")
}
}

View File

@ -24,7 +24,7 @@ type Auth struct {
User, Password string
}
// DefaultDialer returns the dialer specified by the proxy related variables in
// FromEnvironment returns the dialer specified by the proxy related variables in
// the environment.
func FromEnvironment() Dialer {
allProxy := os.Getenv("all_proxy")

View File

@ -332,17 +332,13 @@ func (p *Buffer) buffree(s []byte) {
// Bool is a helper routine that allocates a new bool value
// to store v and returns a pointer to it.
func Bool(v bool) *bool {
p := new(bool)
*p = v
return p
return &v
}
// Int32 is a helper routine that allocates a new int32 value
// to store v and returns a pointer to it.
func Int32(v int32) *int32 {
p := new(int32)
*p = v
return p
return &v
}
// Int is a helper routine that allocates a new int32 value
@ -357,25 +353,19 @@ func Int(v int) *int32 {
// Int64 is a helper routine that allocates a new int64 value
// to store v and returns a pointer to it.
func Int64(v int64) *int64 {
p := new(int64)
*p = v
return p
return &v
}
// Float32 is a helper routine that allocates a new float32 value
// to store v and returns a pointer to it.
func Float32(v float32) *float32 {
p := new(float32)
*p = v
return p
return &v
}
// Float64 is a helper routine that allocates a new float64 value
// to store v and returns a pointer to it.
func Float64(v float64) *float64 {
p := new(float64)
*p = v
return p
return &v
}
// Uint32 is a helper routine that allocates a new uint32 value
@ -389,17 +379,13 @@ func Uint32(v uint32) *uint32 {
// Uint64 is a helper routine that allocates a new uint64 value
// to store v and returns a pointer to it.
func Uint64(v uint64) *uint64 {
p := new(uint64)
*p = v
return p
return &v
}
// String is a helper routine that allocates a new string value
// to store v and returns a pointer to it.
func String(v string) *string {
p := new(string)
*p = v
return p
return &v
}
// EnumName is a helper function to simplify printing protocol buffer enums

View File

@ -13,23 +13,60 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package config
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
)
var commentPrefix = []string{"//", "#", ";"}
func Read(filename string) (map[string]string, error) {
var res = map[string]string{}
in, err := os.Open(filename)
// Config struct constructs a new configuration handler.
type Config struct {
filename string
config map[string]map[string]string
}
// NewConfig function cnstructs a new Config struct with filename. You have to
// call Read() function to let it read from the file. Otherwise you will get
// empty string (i.e., "") when you are calling Get() function. Another usage
// is that you call NewConfig() function and then call Add()/Set() function to
// add new key-values to the configuration. Finally you can call Write()
// function to write the new configuration to the file.
func NewConfig(filename string) *Config {
c := new(Config)
c.filename = filename
c.config = make(map[string]map[string]string)
return c
}
// Filename function returns the filename of the configuration.
func (c *Config) Filename() string {
return c.filename
}
// SetFilename function sets the filename of the configuration.
func (c *Config) SetFilename(filename string) {
c.filename = filename
}
// Reset function reset the map in the configuration.
func (c *Config) Reset() {
c.config = make(map[string]map[string]string)
}
// Read function reads configurations from the file defined in
// Config.filename.
func (c *Config) Read() error {
in, err := os.Open(c.filename)
if err != nil {
return res, err
return err
}
defer in.Close()
scanner := bufio.NewScanner(in)
@ -40,9 +77,9 @@ func Read(filename string) (map[string]string, error) {
continue
}
if line == "" {
sec := checkSection(scanner.Text())
if sec != "" {
section = sec + "."
sec, ok := checkSection(scanner.Text())
if ok {
section = sec
continue
}
}
@ -54,40 +91,103 @@ func Read(filename string) (map[string]string, error) {
line = line[:len(line)-1]
continue
}
key, value, err := checkLine(line)
if err != nil {
return res, errors.New("WRONG: " + line)
key, value, ok := checkLine(line)
if !ok {
return errors.New("WRONG: " + line)
}
res[section+key] = value
c.Set(section, key, value)
line = ""
}
return res, nil
return nil
}
func checkSection(line string) string {
// Get function returns the value of a key in the configuration. If the key
// does not exist, it returns empty string (i.e., "").
func (c *Config) Get(section string, key string) string {
value, ok := c.config[section][key]
if !ok {
return ""
}
return value
}
// Set function updates the value of a key in the configuration. Function
// Set() is exactly the same as function Add().
func (c *Config) Set(section string, key string, value string) {
_, ok := c.config[section]
if !ok {
c.config[section] = make(map[string]string)
}
c.config[section][key] = value
}
// Add function adds a new key to the configuration. Function Add() is exactly
// the same as function Set().
func (c *Config) Add(section string, key string, value string) {
c.Set(section, key, value)
}
// Del function deletes a key from the configuration.
func (c *Config) Del(section string, key string) {
_, ok := c.config[section]
if ok {
delete(c.config[section], key)
if len(c.config[section]) == 0 {
delete(c.config, section)
}
}
}
// Write function writes the updated configuration back.
func (c *Config) Write() error {
return nil
}
// WriteTo function writes the configuration to a new file. This function
// re-organizes the configuration and deletes all the comments.
func (c *Config) WriteTo(filename string) error {
content := ""
for k, v := range c.config {
format := "%v = %v\n"
if k != "" {
content += fmt.Sprintf("[%v]\n", k)
format = "\t" + format
}
for key, value := range v {
content += fmt.Sprintf(format, key, value)
}
}
return ioutil.WriteFile(filename, []byte(content), 0644)
}
// To check this line is a section or not. If it is not a section, it returns
// "".
func checkSection(line string) (string, bool) {
line = strings.TrimSpace(line)
lineLen := len(line)
if lineLen < 2 {
return ""
return "", false
}
if line[0] == '[' && line[lineLen-1] == ']' {
return line[1 : lineLen-1]
return line[1 : lineLen-1], true
}
return ""
return "", false
}
func checkLine(line string) (string, string, error) {
// To check this line is a valid key-value pair or not.
func checkLine(line string) (string, string, bool) {
key := ""
value := ""
sp := strings.SplitN(line, "=", 2)
if len(sp) != 2 {
return key, value, errors.New("WRONG: " + line)
return key, value, false
}
key = strings.TrimSpace(sp[0])
value = strings.TrimSpace(sp[1])
return key, value, nil
return key, value, true
}
// To check this line is a whole line comment or not.
func checkComment(line string) bool {
line = strings.TrimSpace(line)
for p := range commentPrefix {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package main
import (
@ -22,9 +22,11 @@ import (
)
func main() {
res, err := config.Read("example.conf")
c := config.NewConfig("example.conf")
err := c.Read()
fmt.Println(err)
fmt.Println(res)
fmt.Println(res["test.a"])
fmt.Println(res["dd"])
fmt.Println(c)
fmt.Println(c.Get("test", "a"))
fmt.Println(c.Get("", "dd"))
c.WriteTo("example2.conf")
}

View File

@ -5,14 +5,6 @@ go-logging is a high-performance logging library for golang.
low delay of about 800 nano-seconds.
## Getting Started
The stable version is under the `stable` branch, which does never revert and
is fully tested. The tags in `stable` branch indicate the version numbers.
However, `master` branch is unstable version, and `dev` branch is development
branch. `master` branch merges `dev` branch periodically.
Btw, all the pull request should be sent to the `dev` branch.
### Installation
The step below will download the library source code to
`${GOPATH}/src/github.com/ccding/go-logging`.
@ -46,7 +38,6 @@ import (
func main() {
logger, _ := logging.SimpleLogger("main")
logger.SetLevel(logging.DEBUG)
logger.Error("this is a test from error")
logger.Destroy()
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package main
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// Logln receives log request from the client. The request includes a set of

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
// This file defines GetGoId function, which is used to get the id of the
// current goroutine. More details about this function are availeble in the
// runtime.c file of golang source code.

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// Level is the type of level.

View File

@ -99,13 +99,15 @@ func RichLogger(name string) (*Logger, error) {
func FileLogger(name string, level Level, format string, timeFormat string, file string, sync bool) (*Logger, error) {
out, err := os.Create(file)
if err != nil {
return new(Logger), err
return nil, err
}
logger, err := createLogger(name, level, format, timeFormat, out, sync)
if err == nil {
logger.fd = out
return logger, nil
} else {
return nil, err
}
return logger, err
}
// WriterLogger creates a new logger with a writer
@ -115,38 +117,35 @@ func WriterLogger(name string, level Level, format string, timeFormat string, ou
// WriterLogger creates a new logger from a configuration file
func ConfigLogger(filename string) (*Logger, error) {
conf, err := config.Read(filename)
conf := config.NewConfig(filename)
err := conf.Read()
if err != nil {
return new(Logger), err
return nil, err
}
ok := true
name, ok := conf["name"]
if !ok {
name = ""
}
slevel, ok := conf["level"]
if !ok {
name := conf.Get("", "name")
slevel := conf.Get("", "level")
if slevel == "" {
slevel = "0"
}
l, err := strconv.Atoi(slevel)
if err != nil {
return new(Logger), err
return nil, err
}
level := Level(l)
format, ok := conf["format"]
if !ok {
format := conf.Get("", "format")
if format == "" {
format = BasicFormat
}
timeFormat, ok := conf["timeFormat"]
if !ok {
timeFormat := conf.Get("", "timeFormat")
if timeFormat == "" {
timeFormat = DefaultTimeFormat
}
ssync, ok := conf["sync"]
if !ok {
ssync := conf.Get("", "sync")
if ssync == "" {
ssync = "0"
}
file, ok := conf["file"]
if !ok {
file := conf.Get("", "file")
if file == "" {
file = DefaultFileName
}
sync := true
@ -155,7 +154,7 @@ func ConfigLogger(filename string) (*Logger, error) {
} else if ssync == "1" {
sync = true
} else {
return new(Logger), err
return nil, err
}
return FileLogger(name, level, format, timeFormat, file, sync)
}
@ -166,7 +165,7 @@ func createLogger(name string, level Level, format string, timeFormat string, ou
err := logger.parseFormat(format)
if err != nil {
return logger, err
return nil, err
}
// asign values to logger

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
// request struct stores the logger request

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
// author: Cong Ding <dinggnu@gmail.com>
//
package logging
import (

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strings"
"time"
@ -39,10 +40,10 @@ func NewClient() *Client {
// default leader and machines
cluster := Cluster{
Leader: "0.0.0.0:4001",
Leader: "http://0.0.0.0:4001",
Machines: make([]string, 1),
}
cluster.Machines[0] = "0.0.0.0:4001"
cluster.Machines[0] = "http://0.0.0.0:4001"
config := Config{
// default use http
@ -116,19 +117,26 @@ func (c *Client) SyncCluster() bool {
// sync cluster information by providing machine list
func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines {
httpPath := c.createHttpPath(machine, "machines")
httpPath := c.createHttpPath(machine, "v1/machines")
resp, err := c.httpClient.Get(httpPath)
if err != nil {
// try another machine in the cluster
continue
} else {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.Machines = strings.Split(string(b), ",")
// update leader
// the first one in the machine list is the leader
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
c.cluster.Leader = c.cluster.Machines[0]
logger.Debug("sync.machines ", c.cluster.Machines)
return true
}
@ -138,8 +146,9 @@ func (c *Client) internalSyncCluster(machines []string) bool {
// serverName should contain both hostName and port
func (c *Client) createHttpPath(serverName string, _path string) string {
httpPath := path.Join(serverName, _path)
return httpPath
u, _ := url.Parse(serverName)
u.Path = path.Join(u.Path, "/", _path)
return u.String()
}
// Dial with timeout.
@ -148,22 +157,21 @@ func dialTimeout(network, addr string) (net.Conn, error) {
}
func (c *Client) getHttpPath(s ...string) string {
httpPath := path.Join(c.cluster.Leader, version)
u, _ := url.Parse(c.cluster.Leader)
u.Path = path.Join(u.Path, "/", version)
for _, seg := range s {
httpPath = path.Join(httpPath, seg)
u.Path = path.Join(u.Path, seg)
}
httpPath = c.config.Scheme + "://" + httpPath
return httpPath
return u.String()
}
func (c *Client) updateLeader(httpPath string) {
// httpPath http://127.0.0.1:4001/v1...
leader := strings.Split(httpPath, "://")[1]
// we want to have 127.0.0.1:4001
u, _ := url.Parse(httpPath)
leader := u.Host
leader = strings.Split(leader, "/")[0]
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
c.cluster.Leader = leader
}
@ -180,6 +188,7 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re
for {
httpPath := c.getHttpPath(_path)
logger.Debug("send.request.to ", httpPath)
if body == "" {

View File

@ -17,7 +17,6 @@ func main() {
c := etcd.NewClient()
c.Set("lock", "unlock", 0)
for i := 0; i < 10; i++ {
go t(i, ch, etcd.NewClient())
}

View File

@ -1,8 +1,8 @@
package main
package main
import (
"github.com/coreos/go-etcd/etcd"
"fmt"
"github.com/coreos/go-etcd/etcd"
"time"
)
@ -11,21 +11,21 @@ var count = 0
func main() {
ch := make(chan bool, 10)
// set up a lock
for i:=0; i < 100; i++ {
for i := 0; i < 100; i++ {
go t(i, ch, etcd.NewClient())
}
start := time.Now()
for i:=0; i< 100; i++ {
for i := 0; i < 100; i++ {
<-ch
}
fmt.Println(time.Now().Sub(start), ": ", 100 * 50, "commands")
fmt.Println(time.Now().Sub(start), ": ", 100*50, "commands")
}
func t(num int, ch chan bool, c *etcd.Client) {
c.SyncCluster()
for i := 0; i < 50; i++ {
str := fmt.Sprintf("foo_%d",num * i)
str := fmt.Sprintf("foo_%d", num*i)
c.Set(str, "10", 0)
}
ch<-true
ch <- true
}

View File

@ -1,19 +1,49 @@
[![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft)
go-raft
=======
[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft)
## Overview
This is an Go implementation of the Raft distributed consensus protocol.
This is a Go implementation of the Raft distributed consensus protocol.
Raft is a protocol by which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm][raft-paper] by Diego Ongaro and John Ousterhout.
## Project Status
This library is feature complete but should be considered experimental until it has seen more usage.
If you have any questions on implementing go-raft in your project please file an issue.
There is an [active community][community] of developers who can help.
go-raft is under the MIT license.
[community]: https://github.com/goraft/raft/contributors
### Features
- Leader election
- Log replication
- Configuration changes
- Log compaction
- Unit tests
- Fast Protobuf Log Encoding
- HTTP transport
### Projects
These projects are built on go-raft:
- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery
- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus.
If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
## The Raft Protocol
This section provides a summary of the Raft protocol from a high level.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm][raft-paper].
### Overview
Maintaining state in a single process on a single server is easy.
@ -26,7 +56,7 @@ Servers can crash or the network between two machines can become unavailable or
A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster.
Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
An alternative is the [Raft distributed consensus protocol](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout.
Raft is a protocol built with understandability as a primary tenant and it centers around two things:
1. Leader Election
@ -53,17 +83,9 @@ By ensuring that this log is replicated identically between all the nodes in the
Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers).
Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf)
## History
Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky).
He put it under the MIT license in the hopes that it would be useful for other projects too.
## Project Status
The go-raft library is feature complete but in alpha.
There is a reference implementation called [raftd](https://github.com/benbjohnson/raftd) that demonstrates how to use the library
The library will be considered experimental until it has significant production usage.
I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called [Sky](https://github.com/skydb/sky).
However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license.
If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples.
If you have any questions on implementing go-raft in your project, feel free to contact me on [GitHub](https://github.com/benbjohnson), [Twitter](https://twitter.com/benbjohnson) or by e-mail at [ben@skylandlabs.com](mailto:ben@skylandlabs.com).
[raft-paper]: https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

View File

@ -0,0 +1,7 @@
package raft
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []*Peer `json:"peers"`
}

View File

@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}

View File

@ -9,7 +9,8 @@ type JoinCommand interface {
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string {
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}

View File

@ -183,6 +183,15 @@ func (l *Log) open(path string) error {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)

View File

@ -14,7 +14,8 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
mutex sync.RWMutex
stopChan chan bool
@ -28,10 +29,11 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
Name: name,
ConnectionString: connectionString,
heartbeatTimeout: heartbeatTimeout,
}
}
@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
@ -89,17 +86,17 @@ func (p *Peer) startHeartbeat() {
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
func (p *Peer) stopHeartbeat(flush bool) {
// here is a problem
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
// when heartbeat returns
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
}
}
@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
name: p.name,
prevLogIndex: p.prevLogIndex,
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
}
}
@ -128,46 +126,58 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name)
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name)
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
case <-time.After(p.heartbeatTimeout):
p.flush()
}
}
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
return
}
traceln("peer.flush.recv: ", p.Name())
traceln("peer.flush.recv: ", p.Name)
// If successful then update the previous log index.
p.mutex.Lock()
@ -181,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
@ -195,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
@ -206,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
resp.peer = p.Name
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
debugln("peer.snap.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
debugln("peer.snap.timeout: ", p.Name)
return
}
debugln("peer.snap.recv: ", p.name)
debugln("peer.snap.recv: ", p.Name)
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
debugln("peer.snap.failed: ", p.name)
debugln("peer.snap.failed: ", p.Name)
return
}
@ -243,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
debugln("peer.snap.recovery.failed: ", p.Name)
return
}
// Send response to server for processing.
@ -261,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
resp.peer = p
c <- resp
}

View File

@ -14,12 +14,12 @@ var _ = &json.SyntaxError{}
var _ = math.Inf
type ProtoSnapshotRecoveryRequest struct {
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} }
@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 {
return 0
}
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string {
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer {
if m != nil {
return m.Peers
}
@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte {
return nil
}
type ProtoSnapshotRecoveryRequest_ProtoPeer struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() {
*m = ProtoSnapshotRecoveryRequest_ProtoPeer{}
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) }
func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string {
if m != nil && m.ConnectionString != nil {
return *m.ConnectionString
}
return ""
}
func init() {
}

View File

@ -3,7 +3,13 @@ package protobuf;
message ProtoSnapshotRecoveryRequest {
required string LeaderName=1;
required uint64 LastIndex=2;
required uint64 LastTerm=3;
repeated string Peers=4;
required uint64 LastTerm=3;
message ProtoPeer {
required string Name=1;
required string ConnectionString=2;
}
repeated ProtoPeer Peers=4;
required bytes State=5;
}

View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
@ -81,8 +80,6 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
confFile *os.File
}
// An event to be processed by the server's event loop.
@ -272,11 +269,15 @@ func (s *Server) QuorumSize() int {
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.electionTimeout
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.electionTimeout = duration
}
@ -286,6 +287,8 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatTimeout
}
@ -332,14 +335,14 @@ func (s *Server) Start() error {
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
@ -368,59 +371,12 @@ func (s *Server) Start() error {
return nil
}
// Read the configuration for the server.
func (s *Server) readConf() error {
var err error
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
debugln("readConf.create ", confPath)
if err != nil {
return err
}
}
return err
}
peerNames := make([]string, 0)
for {
var peerName string
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
if err != nil {
if err == io.EOF {
s.debugln("server.peer.conf: finish")
break
}
return err
}
s.debugln("server.peer.conf.read: ", peerName)
peerNames = append(peerNames, peerName)
}
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for _, peerName := range peerNames {
s.AddPeer(peerName)
}
return nil
}
// Shuts down the server.
func (s *Server) Stop() {
s.send(&stopValue)
s.mutex.Lock()
defer s.mutex.Unlock()
s.log.close()
s.mutex.Unlock()
}
// Checks if the server is currently running.
@ -532,24 +488,27 @@ func (s *Server) followerLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(JoinCommand); ok {
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && command.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(command, e)
} else {
} else {
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(req, e)
} else {
err = NotLeaderError
}
case *AppendEntriesRequest:
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
case *SnapshotRequest:
e.returnValue = s.processSnapshotRequest(req)
default:
err = NotLeaderError
}
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, update = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, update = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRequest); ok {
e.returnValue = s.processSnapshotRequest(req)
} else {
err = NotLeaderError
}
// Callback to event.
@ -629,14 +588,16 @@ func (s *Server) candidateLoop() {
var err error
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
e.c <- err
@ -660,7 +621,7 @@ func (s *Server) candidateLoop() {
}
}
// The event loop that is run when the server is in a Candidate state.
// The event loop that is run when the server is in a Leader state.
func (s *Server) leaderLoop() {
s.setState(Leader)
s.syncedPeer = make(map[string]bool)
@ -682,15 +643,18 @@ func (s *Server) leaderLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(Command); ok {
s.processCommand(command, e)
continue
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
s.processAppendEntriesResponse(resp)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
@ -705,7 +669,7 @@ func (s *Server) leaderLoop() {
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat()
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
@ -720,16 +684,18 @@ func (s *Server) snapshotLoop() {
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRecoveryRequest); ok {
e.returnValue = s.processSnapshotRecoveryRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
}
// Callback to event.
e.c <- err
@ -959,31 +925,29 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
}
// Only add the peer if it doesn't have the same name.
if s.name != name {
// when loading snapshot s.confFile should be nil
if s.confFile != nil {
_, err := fmt.Fprintln(s.confFile, name)
s.debugln("server.peer.conf.write: ", name)
if err != nil {
return err
}
}
peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.name] = peer
// Skip the Peer if it has the same name as the Server
if s.name == name {
return nil
}
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
return nil
}
@ -991,8 +955,12 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
defer s.writeConf()
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
return nil
}
// Return error if peer doesn't exist.
@ -1001,23 +969,13 @@ func (s *Server) RemovePeer(name string) error {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
peer.stopHeartbeat()
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for peer := range s.peers {
_, err := fmt.Fprintln(s.confFile, peer)
if err != nil {
return err
}
}
return nil
}
@ -1025,16 +983,7 @@ func (s *Server) RemovePeer(name string) error {
// Log compaction
//--------------------------------------
// The background snapshot function
func (s *Server) Snapshot() {
for {
// TODO: change this... to something reasonable
time.Sleep(1 * time.Second)
s.takeSnapshot()
}
}
func (s *Server) takeSnapshot() error {
func (s *Server) TakeSnapshot() error {
//TODO put a snapshot mutex
s.debugln("take Snapshot")
if s.currentSnapshot != nil {
@ -1063,14 +1012,13 @@ func (s *Server) takeSnapshot() error {
state = []byte{0}
}
var peerNames []string
var peers []*Peer
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
peers = append(peers, peer.clone())
}
peerNames = append(peerNames, s.Name())
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
s.saveSnapshot()
@ -1153,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
//update term and index
@ -1246,8 +1194,8 @@ func (s *Server) LoadSnapshot() error {
return err
}
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
s.log.startTerm = s.lastSnapshot.LastTerm
@ -1257,6 +1205,62 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
peers := make([]*Peer, len(s.peers))
i := 0
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.commitIndex = conf.CommitIndex
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------

View File

@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader *Server
for _, name := range names {
server := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command:", err.Error())
}
}
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
server.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------
@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}

View File

@ -21,9 +21,9 @@ type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.
Peers []string `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
Peers []*Peer `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
}
// Save the snapshot to a file

View File

@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []string
Peers []*Peer
State []byte
}
@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.ProtoSnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: req.Peers,
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
totalBytes := len(data)
pb := &protobuf.ProtoSnapshotRequest{}
pb := &protobuf.ProtoSnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.Peers = req.Peers
req.State = req.State
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name())
server.AddPeer(peer.Name(), "")
}
}
return servers

View File

@ -2,18 +2,43 @@ package main
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/coreos/go-raft"
"io"
"net"
"net/http"
)
// Transporter layer for communication between raft nodes
type transporter struct {
client *http.Client
// scheme
scheme string
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config) transporter {
t := transporter{}
tr := &http.Transport{
Dial: dialTimeout,
}
if scheme == "https" {
tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true
}
t.client = &http.Client{Transport: tr}
return t
}
// Dial with timeout
func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, HTTPTimeout)
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
@ -22,7 +47,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send LogEntries to %s ", u)
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
@ -49,7 +74,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send Vote to %s", u)
resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
@ -75,7 +100,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
@ -103,7 +128,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name())
u, _ := nameToRaftURL(peer.Name)
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
@ -126,12 +151,10 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
// Send server side POST request
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
resp, err := t.client.Post(path, "application/json", body)
return resp, err
return t.client.Post(path, "application/json", body)
}
// Send server side GET request
func (t transporter) Get(path string) (*http.Response, error) {
resp, err := t.client.Get(path)
return resp, err
return t.client.Get(path)
}

115
util.go
View File

@ -6,10 +6,35 @@ import (
"github.com/coreos/etcd/web"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime/pprof"
"strconv"
"time"
)
//--------------------------------------
// etcd http Helper
//--------------------------------------
// Convert string duration to time format
func durationToExpireTime(strDuration string) (time.Time, error) {
if strDuration != "" {
duration, err := strconv.Atoi(strDuration)
if err != nil {
return time.Unix(0, 0), err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return time.Unix(0, 0), nil
}
}
//--------------------------------------
// Web Helper
//--------------------------------------
@ -25,6 +50,15 @@ func webHelper() {
}
}
// startWebInterface starts web interface if webURL is not empty
func startWebInterface() {
if argInfo.WebURL != "" {
// start web
go webHelper()
go web.Start(r.Server, argInfo.WebURL)
}
}
//--------------------------------------
// HTTP Utilities
//--------------------------------------
@ -48,6 +82,36 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
}
}
// sanitizeURL will cleanup a host string in the format hostname:port and
// attach a schema.
func sanitizeURL(host string, defaultScheme string) string {
// Blank URLs are fine input, just return it
if len(host) == 0 {
return host
}
p, err := url.Parse(host)
if err != nil {
fatal(err)
}
// Make sure the host is in Host:Port format
_, _, err = net.SplitHostPort(host)
if err != nil {
fatal(err)
}
p = &url.URL{Host: host, Scheme: defaultScheme}
return p.String()
}
func check(err error) {
if err != nil {
fatal(err)
}
}
//--------------------------------------
// Log
//--------------------------------------
@ -58,6 +122,10 @@ func init() {
logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds)
}
func infof(msg string, v ...interface{}) {
logger.Printf("INFO "+msg+"\n", v...)
}
func debugf(msg string, v ...interface{}) {
if verbose {
logger.Printf("DEBUG "+msg+"\n", v...)
@ -87,3 +155,50 @@ func fatal(v ...interface{}) {
logger.Println("FATAL " + fmt.Sprint(v...))
os.Exit(1)
}
//--------------------------------------
// CPU profile
//--------------------------------------
func runCPUProfile() {
f, err := os.Create(cpuprofile)
if err != nil {
fatal(err)
}
pprof.StartCPUProfile(f)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
infof("captured %v, stopping profiler and exiting..", sig)
pprof.StopCPUProfile()
os.Exit(1)
}
}()
}
//--------------------------------------
// Testing
//--------------------------------------
func directSet() {
c := make(chan bool, 1000)
for i := 0; i < 1000; i++ {
go send(c)
}
for i := 0; i < 1000; i++ {
<-c
}
}
func send(c chan bool) {
for i := 0; i < 10; i++ {
command := &SetCommand{}
command.Key = "foo"
command.Value = "bar"
command.ExpireTime = time.Unix(0, 0)
r.Do(command)
}
c <- true
}

View File

@ -1,3 +1,8 @@
package main
const version = "v1"
// TODO: The release version (generated from the git tag) will be the raft
// protocol version for now. When things settle down we will fix it like the
// client API above.
const raftVersion = releaseVersion

View File

@ -29,12 +29,12 @@ func Start(raftServer *raft.Server, webURL string) {
webMux := http.NewServeMux()
server := &http.Server{
Handler: webMux,
Addr: u.Host,
Handler: webMux,
Addr: u.Host,
}
mainPage = &MainPage{
Leader: raftServer.Leader(),
Leader: raftServer.Leader(),
Address: u.Host,
}