Compare commits
33 Commits
Author | SHA1 | Date | |
---|---|---|---|
d6523fe463 | |||
c25127a699 | |||
9f031e6218 | |||
e55724e959 | |||
29af192e3d | |||
2fc79912c2 | |||
ebb8d781b5 | |||
2e30b3c17f | |||
9a2d82854e | |||
b077dcf6c4 | |||
2b572cb6e8 | |||
f36d55f062 | |||
9f70568a02 | |||
1ca7d1e064 | |||
4f1f003d04 | |||
49e0dff2b8 | |||
686837227e | |||
f2652f005e | |||
5490eb5406 | |||
70dda950ed | |||
a884f2a18a | |||
bdeb96be0f | |||
c00594e680 | |||
919cd380ec | |||
b83aec6b87 | |||
05bfb369ef | |||
0639c4c86d | |||
877b3d51bb | |||
d9df58beb8 | |||
1cffdb3a48 | |||
0593a52107 | |||
f7854c4ab9 | |||
973bde9a07 |
@ -1,3 +1,10 @@
|
||||
v0.4.6
|
||||
* Fix long-term timer leak (#900, #875, #868, #904)
|
||||
* Fix `Running` field in standby_info file (#881)
|
||||
* Add `quorum=true` query parameter for GET requests (#866, #883)
|
||||
* Add `Access-Control-Allow-Headers` header for CORS requests (#886)
|
||||
* Various documentation improvements (#907, #882)
|
||||
|
||||
v0.4.5
|
||||
* Flush headers immediatly on `wait=true` requests (#877)
|
||||
* Add `ETCD_HTTP_READ_TIMEOUT` and `ETCD_HTTP_WRITE_TIMEOUT` (#880)
|
||||
|
@ -13,6 +13,14 @@ This will bring up etcd listening on default ports (4001 for client communicatio
|
||||
The `-data-dir machine0` argument tells etcd to write machine configuration, logs and snapshots to the `./machine0/` directory.
|
||||
The `-name machine0` tells the rest of the cluster that this machine is named machine0.
|
||||
|
||||
## Getting the etcd version
|
||||
|
||||
The etcd version of a specific instance can be obtained from the `/version` endpoint.
|
||||
|
||||
```sh
|
||||
curl -L http://127.0.0.1:4001/version
|
||||
```
|
||||
|
||||
## Key Space Operations
|
||||
|
||||
The primary API of etcd is a hierarchical key space.
|
||||
@ -833,6 +841,8 @@ curl -L http://127.0.0.1:4001/v2/keys/afile -XPUT --data-urlencode value@afile.t
|
||||
|
||||
### Read Consistency
|
||||
|
||||
#### Read from the Master
|
||||
|
||||
Followers in a cluster can be behind the leader in their copy of the keyspace.
|
||||
If your application wants or needs the most up-to-date version of a key then it should ensure it reads from the current leader.
|
||||
By using the `consistent=true` flag in your GET requests, etcd will make sure you are talking to the current master.
|
||||
@ -843,6 +853,19 @@ The client is told the write was successful and the keyspace is updated.
|
||||
Meanwhile F2 has partitioned from the network and will have an out-of-date version of the keyspace until the partition resolves.
|
||||
Since F2 missed the most recent write, a client reading from F2 will have an out-of-date version of the keyspace.
|
||||
|
||||
Implementation notes on `consistent=true`: If the leader you are talking to is
|
||||
partitioned it will be unable to determine if it is not currently the master.
|
||||
In a later version we will provide a mechanism to set an upperbound of time
|
||||
that the current master can be unable to contact the quorom and still serve
|
||||
reads.
|
||||
|
||||
### Read Linearization
|
||||
|
||||
If you want a read that is fully linearized you can use a `quorum=true` GET.
|
||||
The read will take a very similar path to a write and will have a similar
|
||||
speed. If you are unsure if you need this feature feel free to email etcd-dev
|
||||
for advice.
|
||||
|
||||
## Lock Module (*Deprecated and Removed*)
|
||||
|
||||
The lock module is used to serialize access to resources used by clients.
|
||||
|
@ -26,7 +26,7 @@ The full documentation is contained in the [API docs](https://github.com/coreos/
|
||||
|
||||
### Required
|
||||
|
||||
* `-name` - The node name. Defaults to the hostname.
|
||||
* `-name` - The node name. Defaults to a UUID.
|
||||
|
||||
### Optional
|
||||
|
||||
@ -55,8 +55,8 @@ The full documentation is contained in the [API docs](https://github.com/coreos/
|
||||
* `-peer-heartbeat-interval` - The number of milliseconds in between heartbeat requests
|
||||
* `-snapshot=false` - Disable log snapshots. Defaults to `true`.
|
||||
* `-cluster-active-size` - The expected number of instances participating in the consensus protocol. Only applied if the etcd instance is the first peer in the cluster.
|
||||
* `-cluster-remove-delay` - The delay before one node is removed from the cluster since it cannot be connected at all. Only applied if the etcd instance is the first peer in the cluster.
|
||||
* `-cluster-sync-interval` - The interval between synchronization for standby-mode instance with the cluster. Only applied if the etcd instance is the first peer in the cluster.
|
||||
* `-cluster-remove-delay` - The number of seconds before one node is removed from the cluster since it cannot be connected at all. Only applied if the etcd instance is the first peer in the cluster.
|
||||
* `-cluster-sync-interval` - The number of seconds between synchronization for standby-mode instance with the cluster. Only applied if the etcd instance is the first peer in the cluster.
|
||||
* `-v` - Enable verbose logging. Defaults to `false`.
|
||||
* `-vv` - Enable very verbose logging. Defaults to `false`.
|
||||
* `-version` - Print the version and exit.
|
||||
|
@ -1,6 +1,6 @@
|
||||
# etcd
|
||||
|
||||
README version 0.4.5
|
||||
README version 0.4.7
|
||||
|
||||
A highly-available key value store for shared configuration and service discovery.
|
||||
etcd is inspired by [Apache ZooKeeper][zookeeper] and [doozer][doozer], with a focus on being:
|
||||
|
@ -309,6 +309,7 @@ func (e *Etcd) runServer() {
|
||||
for {
|
||||
if e.mode == PeerMode {
|
||||
log.Infof("%v starting in peer mode", e.Config.Name)
|
||||
go registerAvailableInternalVersions(e.Config.Name, e.Config.Addr, e.Config.EtcdTLSInfo())
|
||||
// Starting peer server should be followed close by listening on its port
|
||||
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
|
||||
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
|
||||
|
59
etcd/upgrade.go
Normal file
59
etcd/upgrade.go
Normal file
@ -0,0 +1,59 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/log"
|
||||
"github.com/coreos/etcd/server"
|
||||
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
var defaultEtcdBinaryDir = "/usr/libexec/etcd/internal_versions/"
|
||||
|
||||
func registerAvailableInternalVersions(name string, addr string, tls *server.TLSInfo) {
|
||||
var c *etcd.Client
|
||||
if tls.Scheme() == "http" {
|
||||
c = etcd.NewClient([]string{addr})
|
||||
} else {
|
||||
var err error
|
||||
c, err = etcd.NewTLSClient([]string{addr}, tls.CertFile, tls.KeyFile, tls.CAFile)
|
||||
if err != nil {
|
||||
log.Fatalf("client TLS error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
vers, err := getInternalVersions()
|
||||
if err != nil {
|
||||
log.Infof("failed to get local etcd versions: %v", err)
|
||||
return
|
||||
}
|
||||
for _, v := range vers {
|
||||
for {
|
||||
_, err := c.Set("/_etcd/available-internal-versions/"+v+"/"+name, "ok", 0)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
log.Infof("%s: available_internal_versions %s is registered into key space successfully.", name, vers)
|
||||
}
|
||||
|
||||
func getInternalVersions() ([]string, error) {
|
||||
if runtime.GOOS != "linux" {
|
||||
return nil, fmt.Errorf("unmatched os version %v", runtime.GOOS)
|
||||
}
|
||||
etcdBinaryDir := os.Getenv("ETCD_BINARY_DIR")
|
||||
if etcdBinaryDir == "" {
|
||||
etcdBinaryDir = defaultEtcdBinaryDir
|
||||
}
|
||||
dir, err := os.Open(etcdBinaryDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer dir.Close()
|
||||
return dir.Readdirnames(-1)
|
||||
}
|
@ -54,6 +54,7 @@ type CORSHandler struct {
|
||||
func (h *CORSHandler) addHeader(w http.ResponseWriter, origin string) {
|
||||
w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
|
||||
w.Header().Add("Access-Control-Allow-Origin", origin)
|
||||
w.Header().Add("Access-Control-Allow-Headers", "accept, content-type")
|
||||
}
|
||||
|
||||
// ServeHTTP adds the correct CORS headers based on the origin and returns immediately
|
||||
|
@ -46,6 +46,7 @@ func (c *JoinCommand) NodeName() string {
|
||||
// applyJoin attempts to join a machine to the cluster.
|
||||
func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
|
||||
ps, _ := context.Server().Context().(*PeerServer)
|
||||
ps.raftServer.FlushCommitIndex()
|
||||
commitIndex := context.CommitIndex()
|
||||
|
||||
// Make sure we're not getting a cached value from the registry.
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -292,6 +293,7 @@ func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
|
||||
s.startRoutine(s.monitorTimeoutThreshold)
|
||||
s.startRoutine(s.monitorActiveSize)
|
||||
s.startRoutine(s.monitorPeerActivity)
|
||||
s.startRoutine(s.monitorVersion)
|
||||
|
||||
// open the snapshot
|
||||
if snapshot {
|
||||
@ -370,6 +372,7 @@ func (s *PeerServer) HTTPHandler() http.Handler {
|
||||
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
|
||||
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
|
||||
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
|
||||
router.HandleFunc("/v2/admin/next-internal-version", s.NextInternalVersionHandler).Methods("GET")
|
||||
|
||||
return router
|
||||
}
|
||||
@ -772,9 +775,9 @@ func (s *PeerServer) startRoutine(f func()) {
|
||||
func (s *PeerServer) monitorSnapshot() {
|
||||
for {
|
||||
timer := time.NewTimer(s.snapConf.checkingInterval)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
@ -807,6 +810,8 @@ func (s *PeerServer) monitorSync() {
|
||||
// monitorTimeoutThreshold groups timeout threshold events together and prints
|
||||
// them as a single log line.
|
||||
func (s *PeerServer) monitorTimeoutThreshold() {
|
||||
ticker := time.NewTicker(ThresholdMonitorTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
@ -815,12 +820,10 @@ func (s *PeerServer) monitorTimeoutThreshold() {
|
||||
log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
|
||||
}
|
||||
|
||||
timer := time.NewTimer(ThresholdMonitorTimeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case <-timer.C:
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -828,13 +831,13 @@ func (s *PeerServer) monitorTimeoutThreshold() {
|
||||
// monitorActiveSize has the leader periodically check the status of cluster
|
||||
// nodes and swaps them out for standbys as needed.
|
||||
func (s *PeerServer) monitorActiveSize() {
|
||||
ticker := time.NewTicker(ActiveMonitorTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
timer := time.NewTimer(ActiveMonitorTimeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case <-timer.C:
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
// Ignore while this peer is not a leader.
|
||||
@ -864,13 +867,13 @@ func (s *PeerServer) monitorActiveSize() {
|
||||
|
||||
// monitorPeerActivity has the leader periodically for dead nodes and demotes them.
|
||||
func (s *PeerServer) monitorPeerActivity() {
|
||||
ticker := time.NewTicker(PeerActivityMonitorTimeout)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
timer := time.NewTimer(PeerActivityMonitorTimeout)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case <-timer.C:
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
// Ignore while this peer is not a leader.
|
||||
@ -895,3 +898,30 @@ func (s *PeerServer) monitorPeerActivity() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PeerServer) monitorVersion() {
|
||||
for {
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
|
||||
resp, err := s.store.Get("/_etcd/next-internal-version", false, false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// only support upgrading to etcd2
|
||||
if *resp.Node.Value == "2" {
|
||||
log.Infof("%s: detected next internal version 2, exit after 10 seconds.", s.Config.Name)
|
||||
} else {
|
||||
log.Infof("%s: detected invaild next internal version %s", s.Config.Name, *resp.Node.Value)
|
||||
continue
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
// be nice to raft. try not to corrupt log file.
|
||||
go s.raftServer.Stop()
|
||||
time.Sleep(time.Second)
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -309,6 +310,48 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (ps *PeerServer) NextInternalVersionHandler(w http.ResponseWriter, req *http.Request) {
|
||||
for i := 0; i < 50; i++ {
|
||||
if ps.raftServer.State() != raft.Leader {
|
||||
l := ps.raftServer.Leader()
|
||||
if l == "" {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
url, _ := ps.registry.PeerURL(l)
|
||||
uhttp.Redirect(url, w, req)
|
||||
return
|
||||
}
|
||||
resp, err := ps.store.Get("/_etcd/available-internal-versions/2", true, true)
|
||||
if err != nil {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
available := make(map[string]bool)
|
||||
for _, n := range resp.Node.Nodes {
|
||||
available[path.Base(n.Key)] = true
|
||||
}
|
||||
|
||||
notfound := false
|
||||
for _, n := range ps.registry.Names() {
|
||||
if !available[n] {
|
||||
notfound = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if notfound {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
c := ps.store.CommandFactory().CreateSetCommand("/_etcd/next-internal-version", false, "2", store.Permanent)
|
||||
_, err = ps.raftServer.Do(c)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
|
||||
// machineMessage represents information about a peer or standby in the registry.
|
||||
type machineMessage struct {
|
||||
Name string `json:"name"`
|
||||
|
@ -1,3 +0,0 @@
|
||||
package server
|
||||
|
||||
const ReleaseVersion = "0.4.5"
|
@ -285,7 +285,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
|
||||
// Handler to return the current version of etcd.
|
||||
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintf(w, "etcd %s", ReleaseVersion)
|
||||
fmt.Fprintf(w, `{"releaseVersion":"%s","internalVersion":"%s"}`, ReleaseVersion, InternalVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -324,12 +324,8 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
|
||||
return nil
|
||||
}
|
||||
|
||||
leader := s.peerServer.RaftServer().Leader()
|
||||
if leader == "" {
|
||||
return etcdErr.NewError(300, "", s.Store().Index())
|
||||
}
|
||||
hostname, _ := s.registry.ClientURL(leader)
|
||||
uhttp.Redirect(hostname, w, req)
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
w.Write([]byte("not current leader"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ type StandbyServerConfig struct {
|
||||
}
|
||||
|
||||
type standbyInfo struct {
|
||||
// stay running in standby mode
|
||||
Running bool
|
||||
Cluster []*machineMessage
|
||||
SyncInterval float64
|
||||
@ -78,12 +79,16 @@ func (s *StandbyServer) Start() {
|
||||
s.removeNotify = make(chan bool)
|
||||
s.closeChan = make(chan bool)
|
||||
|
||||
s.Running = true
|
||||
if err := s.saveInfo(); err != nil {
|
||||
log.Warnf("error saving cluster info for standby")
|
||||
}
|
||||
|
||||
s.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer s.routineGroup.Done()
|
||||
s.monitorCluster()
|
||||
}()
|
||||
s.Running = true
|
||||
}
|
||||
|
||||
// Stop stops the server gracefully.
|
||||
@ -97,11 +102,6 @@ func (s *StandbyServer) Stop() {
|
||||
|
||||
close(s.closeChan)
|
||||
s.routineGroup.Wait()
|
||||
|
||||
if err := s.saveInfo(); err != nil {
|
||||
log.Warnf("error saving cluster info for standby")
|
||||
}
|
||||
s.Running = false
|
||||
}
|
||||
|
||||
// RemoveNotify notifies the server is removed from standby mode and ready
|
||||
@ -178,13 +178,21 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request)
|
||||
// monitorCluster assumes that the machine has tried to join the cluster and
|
||||
// failed, so it waits for the interval at the beginning.
|
||||
func (s *StandbyServer) monitorCluster() {
|
||||
ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
timer := time.NewTimer(time.Duration(int64(s.SyncInterval * float64(time.Second))))
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case <-timer.C:
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
ok, err := s.checkMemberInternalVersionIsV2()
|
||||
if err != nil {
|
||||
log.Warnf("fail checking internal version(%v): %v", s.ClusterURLs(), err)
|
||||
} else if ok {
|
||||
log.Infof("Detect the cluster has been upgraded to v2. Exit now.")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if err := s.syncCluster(nil); err != nil {
|
||||
@ -204,6 +212,10 @@ func (s *StandbyServer) monitorCluster() {
|
||||
}
|
||||
|
||||
log.Infof("join through leader %v", leader.PeerURL)
|
||||
s.Running = false
|
||||
if err := s.saveInfo(); err != nil {
|
||||
log.Warnf("error saving cluster info for standby")
|
||||
}
|
||||
go func() {
|
||||
s.Stop()
|
||||
close(s.removeNotify)
|
||||
@ -212,6 +224,39 @@ func (s *StandbyServer) monitorCluster() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StandbyServer) checkMemberInternalVersionIsV2() (bool, error) {
|
||||
c := &http.Client{Transport: s.client.Client.Transport}
|
||||
for _, memb := range s.Cluster {
|
||||
url := memb.ClientURL
|
||||
resp, err := c.Get(url + "/version")
|
||||
if err != nil {
|
||||
log.Debugf("failed to get /version from %s", url)
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Debugf("failed to read body from %s", url)
|
||||
continue
|
||||
}
|
||||
|
||||
var m map[string]string
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
log.Debugf("failed to unmarshal body %s from %s", b, url)
|
||||
continue
|
||||
}
|
||||
switch m["internalVersion"] {
|
||||
case "1":
|
||||
return false, nil
|
||||
case "2":
|
||||
return true, nil
|
||||
default:
|
||||
log.Warnf("unrecognized internal version %s from %s", m["internalVersion"], url)
|
||||
}
|
||||
}
|
||||
return false, fmt.Errorf("failed to get version")
|
||||
}
|
||||
|
||||
func (s *StandbyServer) syncCluster(peerURLs []string) error {
|
||||
peerURLs = append(s.ClusterURLs(), peerURLs...)
|
||||
|
||||
|
@ -17,6 +17,14 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
vars := mux.Vars(req)
|
||||
key := "/" + vars["key"]
|
||||
|
||||
recursive := (req.FormValue("recursive") == "true")
|
||||
sort := (req.FormValue("sorted") == "true")
|
||||
|
||||
if req.FormValue("quorum") == "true" {
|
||||
c := s.Store().CommandFactory().CreateGetCommand(key, recursive, sort)
|
||||
return s.Dispatch(c, w, req)
|
||||
}
|
||||
|
||||
// Help client to redirect the request to the current leader
|
||||
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
|
||||
leader := s.Leader()
|
||||
@ -35,8 +43,6 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
recursive := (req.FormValue("recursive") == "true")
|
||||
sort := (req.FormValue("sorted") == "true")
|
||||
waitIndex := req.FormValue("waitIndex")
|
||||
stream := (req.FormValue("stream") == "true")
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
package server
|
||||
|
||||
const ReleaseVersion = "0.4.7"
|
||||
const InternalVersion = "1"
|
||||
const Version = "v2"
|
||||
|
@ -24,6 +24,7 @@ type CommandFactory interface {
|
||||
prevIndex uint64, expireTime time.Time) raft.Command
|
||||
CreateCompareAndDeleteCommand(key string, prevValue string, prevIndex uint64) raft.Command
|
||||
CreateSyncCommand(now time.Time) raft.Command
|
||||
CreateGetCommand(key string, recursive, sorted bool) raft.Command
|
||||
}
|
||||
|
||||
// RegisterCommandFactory adds a command factory to the global registry.
|
||||
|
@ -89,3 +89,11 @@ func (f *CommandFactory) CreateSyncCommand(now time.Time) raft.Command {
|
||||
Time: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *CommandFactory) CreateGetCommand(key string, recursive, sorted bool) raft.Command {
|
||||
return &GetCommand{
|
||||
Key: key,
|
||||
Recursive: recursive,
|
||||
Sorted: sorted,
|
||||
}
|
||||
}
|
||||
|
35
store/v2/get_command.go
Normal file
35
store/v2/get_command.go
Normal file
@ -0,0 +1,35 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/log"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
||||
)
|
||||
|
||||
func init() {
|
||||
raft.RegisterCommand(&GetCommand{})
|
||||
}
|
||||
|
||||
// The GetCommand gets a key from the Store.
|
||||
type GetCommand struct {
|
||||
Key string `json:"key"`
|
||||
Recursive bool `json:"recursive"`
|
||||
Sorted bool `json:sorted`
|
||||
}
|
||||
|
||||
// The name of the get command in the log
|
||||
func (c *GetCommand) CommandName() string {
|
||||
return "etcd:get"
|
||||
}
|
||||
|
||||
// Get the key
|
||||
func (c *GetCommand) Apply(context raft.Context) (interface{}, error) {
|
||||
s, _ := context.Server().StateMachine().(store.Store)
|
||||
e, err := s.Get(c.Key, c.Recursive, c.Sorted)
|
||||
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return nil, err
|
||||
}
|
||||
return e, nil
|
||||
}
|
Reference in New Issue
Block a user