rafthttp: support sending v3 snapshot message

Use snapshotSender to send v3 snapshot message. It puts raft snapshot
message and v3 snapshot into request body, then sends it to the target peer.
When it receives http.StatusNoContent, it knows the message has been
received and processed successfully.

As receiver, snapHandler saves v3 snapshot and then processes the raft snapshot
message, then respond with http.StatusNoContent.
This commit is contained in:
Yicheng Qin
2015-10-11 11:49:52 -07:00
parent 207c92b627
commit 1f21ccf166
9 changed files with 472 additions and 102 deletions

View File

@ -359,9 +359,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
ID: id, ID: id,
ClusterID: cl.ID(), ClusterID: cl.ID(),
Raft: srv, Raft: srv,
SnapSaver: s.snapStore,
ServerStats: sstats, ServerStats: sstats,
LeaderStats: lstats, LeaderStats: lstats,
ErrorC: srv.errorc, ErrorC: srv.errorc,
V3demo: cfg.V3demo,
} }
if err := tr.Start(); err != nil { if err := tr.Start(); err != nil {
return nil, err return nil, err
@ -378,6 +380,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
} }
} }
srv.r.transport = tr srv.r.transport = tr
if cfg.V3demo {
s.snapStore.tr = tr
}
return srv, nil return srv, nil
} }

View File

@ -17,6 +17,7 @@ package etcdserver
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
"path" "path"
"reflect" "reflect"
@ -1477,6 +1478,7 @@ func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) SnapshotReady(rc io.ReadCloser, index uint64) {}
func (s *nopTransporter) Stop() {} func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {} func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {} func (s *nopTransporter) Resume() {}

View File

@ -24,26 +24,52 @@ import (
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
dstorage "github.com/coreos/etcd/storage" dstorage "github.com/coreos/etcd/storage"
) )
type snapshot struct { type snapshot struct {
r raftpb.Snapshot r raftpb.Snapshot
kv dstorage.Snapshot
io.ReadCloser // used to read out v3 snapshot
done chan struct{}
}
func newSnapshot(r raftpb.Snapshot, kv dstorage.Snapshot) *snapshot {
done := make(chan struct{})
pr, pw := io.Pipe()
go func() {
_, err := kv.WriteTo(pw)
pw.CloseWithError(err)
kv.Close()
close(done)
}()
return &snapshot{
r: r,
ReadCloser: pr,
done: done,
}
} }
func (s *snapshot) raft() raftpb.Snapshot { return s.r } func (s *snapshot) raft() raftpb.Snapshot { return s.r }
func (s *snapshot) size() int64 { return s.kv.Size() } func (s *snapshot) isClosed() bool {
select {
func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return s.kv.WriteTo(w) } case <-s.done:
return true
func (s *snapshot) close() error { return s.kv.Close() } default:
return false
}
}
// TODO: remove snapshotStore. getSnap part could be put into memoryStorage,
// while SaveFrom could be put into another struct, or even put into dstorage package.
type snapshotStore struct { type snapshotStore struct {
// dir to save snapshot data // dir to save snapshot data
dir string dir string
kv dstorage.KV kv dstorage.KV
tr rafthttp.Transporter
// send empty to reqsnapc to notify the channel receiver to send back latest // send empty to reqsnapc to notify the channel receiver to send back latest
// snapshot to snapc // snapshot to snapc
@ -66,8 +92,18 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
// getSnap returns a snapshot. // getSnap returns a snapshot.
// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned. // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
//
// Internally it creates new snapshot and returns the snapshot. Unless the
// returned snapshot is closed, it rejects creating new one and returns
// ErrSnapshotTemporarilyUnavailable.
// If raft state machine wants to send two snapshot messages to two followers,
// the second snapshot message will keep getting snapshot and succeed only after
// the first message is sent. This increases the time used to send messages,
// but it is acceptable because this should happen seldomly.
func (ss *snapshotStore) getSnap() (*snapshot, error) { func (ss *snapshotStore) getSnap() (*snapshot, error) {
if ss.snap != nil { // If snapshotStore has some snapshot that has not been closed, it cannot
// request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable.
if ss.snap != nil && !ss.snap.isClosed() {
return nil, raft.ErrSnapshotTemporarilyUnavailable return nil, raft.ErrSnapshotTemporarilyUnavailable
} }
@ -76,30 +112,30 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) {
// generate KV snapshot // generate KV snapshot
kvsnap := ss.kv.Snapshot() kvsnap := ss.kv.Snapshot()
raftsnap := <-ss.raftsnapc raftsnap := <-ss.raftsnapc
ss.snap = &snapshot{ ss.snap = newSnapshot(raftsnap, kvsnap)
r: raftsnap, // give transporter the generated snapshot that is ready to send out
kv: kvsnap, ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index)
}
return ss.snap, nil return ss.snap, nil
} }
// saveSnap saves snapshot into disk. // SaveFrom saves snapshot at the given index from the given reader.
// // If the snapshot with the given index has been saved successfully, it keeps
// If snapshot has existed in disk, it keeps the original snapshot and returns error. // the original saved snapshot and returns error.
// The function guarantees that it always saves either complete snapshot or no snapshot, // The function guarantees that SaveFrom always saves either complete
// even if the call is aborted because program is hard killed. // snapshot or no snapshot, even if the call is aborted because program
func (ss *snapshotStore) saveSnap(s *snapshot) error { // is hard killed.
func (ss *snapshotStore) SaveFrom(r io.Reader, index uint64) error {
f, err := ioutil.TempFile(ss.dir, "tmp") f, err := ioutil.TempFile(ss.dir, "tmp")
if err != nil { if err != nil {
return err return err
} }
_, err = s.writeTo(f) _, err = io.Copy(f, r)
f.Close() f.Close()
if err != nil { if err != nil {
os.Remove(f.Name()) os.Remove(f.Name())
return err return err
} }
fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", s.raft().Metadata.Index)) fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", index))
if fileutil.Exist(fn) { if fileutil.Exist(fn) {
os.Remove(f.Name()) os.Remove(f.Name())
return fmt.Errorf("snapshot to save has existed") return fmt.Errorf("snapshot to save has existed")

View File

@ -16,6 +16,7 @@ package rafthttp
import ( import (
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "path"
@ -35,6 +36,7 @@ var (
RaftPrefix = "/raft" RaftPrefix = "/raft"
ProbingPrefix = path.Join(RaftPrefix, "probing") ProbingPrefix = path.Join(RaftPrefix, "probing")
RaftStreamPrefix = path.Join(RaftPrefix, "stream") RaftStreamPrefix = path.Join(RaftPrefix, "stream")
RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
errIncompatibleVersion = errors.New("incompatible version") errIncompatibleVersion = errors.New("incompatible version")
errClusterIDMismatch = errors.New("cluster ID mismatch") errClusterIDMismatch = errors.New("cluster ID mismatch")
@ -47,6 +49,14 @@ func NewHandler(r Raft, cid types.ID) http.Handler {
} }
} }
func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
return &snapshotHandler{
r: r,
snapSaver: snapSaver,
cid: cid,
}
}
type peerGetter interface { type peerGetter interface {
Get(id types.ID) Peer Get(id types.ID) Peer
} }
@ -76,19 +86,10 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil { w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
plog.Errorf("request received was ignored (%v)", err)
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
return
}
wcid := h.cid.String() if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
w.Header().Set("X-Etcd-Cluster-ID", wcid) http.Error(w, err.Error(), http.StatusPreconditionFailed)
gcid := r.Header.Get("X-Etcd-Cluster-ID")
if gcid != wcid {
plog.Errorf("request received was ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
return return
} }
@ -122,6 +123,76 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
type snapshotHandler struct {
r Raft
snapSaver SnapshotSaver
cid types.ID
}
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
// the handler will keep waiting for the request body until TCP keepalive
// finds out that the connection is broken after several minutes.
// This is acceptable because
// 1. snapshot messages sent through other TCP connections could still be
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
dec := &messageDecoder{r: r.Body}
m, err := dec.decode()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
return
}
// save snapshot
if err := h.snapSaver.SaveFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
plog.Infof("received and saved snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
// Process may return writerToResponse error when doing some
// additional checks before calling raft.Node.Step.
case writerToResponse:
v.WriteTo(w)
default:
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
}
return
}
// Write StatusNoContet header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}
type streamHandler struct { type streamHandler struct {
peerGetter peerGetter peerGetter peerGetter
r Raft r Raft
@ -137,19 +208,10 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
w.Header().Set("X-Server-Version", version.Version) w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil { if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
plog.Errorf("request received was ignored (%v)", err) http.Error(w, err.Error(), http.StatusPreconditionFailed)
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
return
}
wcid := h.cid.String()
w.Header().Set("X-Etcd-Cluster-ID", wcid)
if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
plog.Errorf("streaming request ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
return return
} }
@ -187,7 +249,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// with the same cluster ID. // with the same cluster ID.
// 2. local etcd falls behind of the cluster, and cannot recognize // 2. local etcd falls behind of the cluster, and cannot recognize
// the members that joined after its current progress. // the members that joined after its current progress.
plog.Errorf("failed to find member %s in cluster %s", from, wcid) plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
http.Error(w, "error sender not found", http.StatusNotFound) http.Error(w, "error sender not found", http.StatusNotFound)
return return
} }
@ -214,6 +276,23 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
<-c.closeNotify() <-c.closeNotify()
} }
// checkClusterCompatibilityFromHeader checks the cluster compatibility of
// the local member from the given header.
// It checks whether the version of local member is compatible with
// the versions in the header, and whether the cluster ID of local member
// matches the one in the header.
func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error {
if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil {
plog.Errorf("request version incompatibility (%v)", err)
return errIncompatibleVersion
}
if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
return errClusterIDMismatch
}
return nil
}
type closeNotifier struct { type closeNotifier struct {
done chan struct{} done chan struct{}
} }

View File

@ -49,6 +49,7 @@ const (
streamAppV2 = "streamMsgAppV2" streamAppV2 = "streamMsgAppV2"
streamMsg = "streamMsg" streamMsg = "streamMsg"
pipelineMsg = "pipeline" pipelineMsg = "pipeline"
sendSnap = "sendMsgSnap"
) )
type Peer interface { type Peer interface {
@ -89,12 +90,14 @@ type peer struct {
// id of the remote raft peer node // id of the remote raft peer node
id types.ID id types.ID
r Raft r Raft
v3demo bool
status *peerStatus status *peerStatus
msgAppWriter *streamWriter msgAppWriter *streamWriter
writer *streamWriter writer *streamWriter
pipeline *pipeline pipeline *pipeline
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
msgAppReader *streamReader msgAppReader *streamReader
sendc chan raftpb.Message sendc chan raftpb.Message
@ -111,16 +114,18 @@ type peer struct {
done chan struct{} done chan struct{}
} }
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer { func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64, v3demo bool) *peer {
picker := newURLPicker(urls) picker := newURLPicker(urls)
status := newPeerStatus(to) status := newPeerStatus(to)
p := &peer{ p := &peer{
id: to, id: to,
r: r, r: r,
v3demo: v3demo,
status: status, status: status,
msgAppWriter: startStreamWriter(to, status, fs, r), msgAppWriter: startStreamWriter(to, status, fs, r),
writer: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r),
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
sendc: make(chan raftpb.Message), sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize), recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals), propc: make(chan raftpb.Message, maxPendingProposals),
@ -158,6 +163,10 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
if paused { if paused {
continue continue
} }
if p.v3demo && isMsgSnap(m) {
go p.snapSender.send(m)
continue
}
writec, name := p.pick(m) writec, name := p.pick(m)
select { select {
case writec <- m: case writec <- m:
@ -187,6 +196,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
p.msgAppWriter.stop() p.msgAppWriter.stop()
p.writer.stop() p.writer.stop()
p.pipeline.stop() p.pipeline.stop()
p.snapSender.stop()
p.msgAppReader.stop() p.msgAppReader.stop()
reader.stop() reader.stop()
close(p.done) close(p.done)

View File

@ -17,10 +17,8 @@ package rafthttp
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings"
"sync" "sync"
"time" "time"
@ -30,7 +28,6 @@ import (
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
) )
const ( const (
@ -125,18 +122,7 @@ func (p *pipeline) handle() {
// error on any failure. // error on any failure.
func (p *pipeline) post(data []byte) (err error) { func (p *pipeline) post(data []byte) (err error) {
u := p.picker.pick() u := p.picker.pick()
uu := u req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.from, p.cid)
uu.Path = RaftPrefix
req, err := http.NewRequest("POST", uu.String(), bytes.NewBuffer(data))
if err != nil {
p.picker.unreachable(u)
return err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("X-Server-From", p.from.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
var stopped bool var stopped bool
defer func() { defer func() {
@ -170,31 +156,14 @@ func (p *pipeline) post(data []byte) (err error) {
} }
resp.Body.Close() resp.Body.Close()
switch resp.StatusCode { err = checkPostResponse(resp, b, req, p.to)
case http.StatusPreconditionFailed: // errMemberRemoved is a critical error since a removed member should
switch strings.TrimSuffix(string(b), "\n") { // always be stopped. So we use reportCriticalError to report it to errorc.
case errIncompatibleVersion.Error(): if err == errMemberRemoved {
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", p.to) reportCriticalError(err, p.errorc)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
p.to, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
return errClusterIDMismatch
default:
return fmt.Errorf("unhandled error %q when precondition failed", string(b))
}
case http.StatusForbidden:
err := fmt.Errorf("the member has been permanently removed from the cluster")
select {
case p.errorc <- err:
default:
}
return nil return nil
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
} }
return err
} }
// waitSchedule waits other goroutines to be scheduled for a while // waitSchedule waits other goroutines to be scheduled for a while

161
rafthttp/snapshot_sender.go Normal file
View File

@ -0,0 +1,161 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)
type snapshotSender struct {
from, to types.ID
cid types.ID
tr http.RoundTripper
picker *urlPicker
status *peerStatus
snapst *snapshotStore
r Raft
errorc chan error
stopc chan struct{}
}
func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, snapst *snapshotStore, r Raft, errorc chan error) *snapshotSender {
return &snapshotSender{
from: from,
to: to,
cid: cid,
tr: tr,
picker: picker,
status: status,
snapst: snapst,
r: r,
errorc: errorc,
stopc: make(chan struct{}),
}
}
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(m raftpb.Message) {
start := time.Now()
body := createSnapBody(m, s.snapst)
defer body.Close()
u := s.picker.pick()
req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
err := s.post(req)
if err != nil {
// errMemberRemoved is a critical error since a removed member should
// always be stopped. So we use reportCriticalError to report it to errorc.
if err == errMemberRemoved {
reportCriticalError(err, s.errorc)
}
s.picker.unreachable(u)
reportSentFailure(sendSnap, m)
s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
s.r.ReportUnreachable(m.To)
// report SnapshotFailure to raft state machine. After raft state
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
if s.status.isActive() {
plog.Warningf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
} else {
plog.Debugf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
}
return
}
reportSentDuration(sendSnap, m, time.Since(start))
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
}
// post posts the given request.
// It returns nil when request is sent out and processed successfully.
func (s *snapshotSender) post(req *http.Request) (err error) {
cancel := httputil.RequestCanceler(s.tr, req)
type responseAndError struct {
resp *http.Response
body []byte
err error
}
result := make(chan responseAndError, 1)
go func() {
// TODO: cancel the request if it has waited for a long time(~5s) after
// it has write out the full request body, which helps to avoid receiver
// dies when sender is waiting for response
// TODO: the snapshot could be large and eat up all resources when writing
// it out. Send it block by block and rest some time between to give the
// time for main loop to run.
resp, err := s.tr.RoundTrip(req)
if err != nil {
result <- responseAndError{resp, nil, err}
return
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
result <- responseAndError{resp, body, err}
}()
select {
case <-s.stopc:
cancel()
return errStopped
case r := <-result:
if r.err != nil {
return r.err
}
return checkPostResponse(r.resp, r.body, req, s.to)
}
}
// readCloser implements io.ReadCloser interface.
type readCloser struct {
io.Reader
io.Closer
}
// createSnapBody creates the request body for the given raft snapshot message.
// Callers should close body when done reading from it.
func createSnapBody(m raftpb.Message, snapst *snapshotStore) io.ReadCloser {
buf := new(bytes.Buffer)
enc := &messageEncoder{w: buf}
// encode raft message
if err := enc.encode(m); err != nil {
plog.Panicf("encode message error (%v)", err)
}
// get snapshot
rc := snapst.get(m.Snapshot.Metadata.Index)
return &readCloser{
Reader: io.MultiReader(buf, rc),
Closer: rc,
}
}

View File

@ -15,6 +15,7 @@
package rafthttp package rafthttp
import ( import (
"io"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -38,6 +39,12 @@ type Raft interface {
ReportSnapshot(id uint64, status raft.SnapshotStatus) ReportSnapshot(id uint64, status raft.SnapshotStatus)
} }
// SnapshotSaver is the interface that wraps the SaveFrom method.
type SnapshotSaver interface {
// SaveFrom saves the snapshot data at the given index from the given reader.
SaveFrom(r io.Reader, index uint64) error
}
type Transporter interface { type Transporter interface {
// Start starts the given Transporter. // Start starts the given Transporter.
// Start MUST be called before calling other functions in the interface. // Start MUST be called before calling other functions in the interface.
@ -78,6 +85,10 @@ type Transporter interface {
// If the connection is active since peer was added, it returns the adding time. // If the connection is active since peer was added, it returns the adding time.
// If the connection is currently inactive, it returns zero time. // If the connection is currently inactive, it returns zero time.
ActiveSince(id types.ID) time.Time ActiveSince(id types.ID) time.Time
// SnapshotReady accepts a snapshot at the given index that is ready to send out.
// SnapshotReady MUST not be called when the snapshot sent result of previous
// accepted one has not been reported.
SnapshotReady(rc io.ReadCloser, index uint64)
// Stop closes the connections and stops the transporter. // Stop closes the connections and stops the transporter.
Stop() Stop()
} }
@ -95,6 +106,7 @@ type Transport struct {
ID types.ID // local member ID ID types.ID // local member ID
ClusterID types.ID // raft cluster ID for request validation ClusterID types.ID // raft cluster ID for request validation
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
SnapSaver SnapshotSaver // used to save snapshot in v3 snapshot messages
ServerStats *stats.ServerStats // used to record general transportation statistics ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when // used to record transportation statistics with followers when
// performing as leader in raft protocol // performing as leader in raft protocol
@ -104,6 +116,7 @@ type Transport struct {
// When an error is received from ErrorC, user should stop raft state // When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport. // machine and thus stop the Transport.
ErrorC chan error ErrorC chan error
V3demo bool
streamRt http.RoundTripper // roundTripper used by streams streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines pipelineRt http.RoundTripper // roundTripper used by pipelines
@ -113,6 +126,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map peers map[types.ID]Peer // peers map
snapst *snapshotStore
prober probing.Prober prober probing.Prober
} }
@ -131,6 +146,7 @@ func (t *Transport) Start() error {
} }
t.remotes = make(map[types.ID]*remote) t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer) t.peers = make(map[types.ID]Peer)
t.snapst = &snapshotStore{}
t.prober = probing.NewProber(t.pipelineRt) t.prober = probing.NewProber(t.pipelineRt)
return nil return nil
} }
@ -138,9 +154,11 @@ func (t *Transport) Start() error {
func (t *Transport) Handler() http.Handler { func (t *Transport) Handler() http.Handler {
pipelineHandler := NewHandler(t.Raft, t.ClusterID) pipelineHandler := NewHandler(t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID) streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler) mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler) mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler()) mux.Handle(ProbingPrefix, probing.NewHandler())
return mux return mux
} }
@ -234,7 +252,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
plog.Panicf("newURLs %+v should never fail: %+v", us, err) plog.Panicf("newURLs %+v should never fail: %+v", us, err)
} }
fs := t.LeaderStats.Follower(id.String()) fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.term) t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.term, t.V3demo)
addPeerToProber(t.prober, id.String(), us) addPeerToProber(t.prober, id.String(), us)
} }
@ -290,6 +308,10 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
return time.Time{} return time.Time{}
} }
func (t *Transport) SnapshotReady(rc io.ReadCloser, index uint64) {
t.snapst.put(rc, index)
}
type Pausable interface { type Pausable interface {
Pause() Pause()
Resume() Resume()
@ -307,3 +329,29 @@ func (t *Transport) Resume() {
p.(Pausable).Resume() p.(Pausable).Resume()
} }
} }
// snapshotStore is the store of snapshot. Caller could put one
// snapshot into the store, and get it later.
// snapshotStore stores at most one snapshot at a time, or it panics.
type snapshotStore struct {
rc io.ReadCloser
// index of the stored snapshot
// index is 0 if and only if there is no snapshot stored.
index uint64
}
func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
if s.index != 0 {
plog.Panicf("unexpected put when there is one snapshot stored")
}
s.rc, s.index = rc, index
}
func (s *snapshotStore) get(index uint64) io.ReadCloser {
if s.index == index {
// set index to 0 to indicate no snapshot stored
s.index = 0
return s.rc
}
return nil
}

View File

@ -19,12 +19,17 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
) )
var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
func writeEntryTo(w io.Writer, ent *raftpb.Entry) error { func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
size := ent.Size() size := ent.Size()
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil { if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
@ -50,6 +55,59 @@ func readEntryFrom(r io.Reader, ent *raftpb.Entry) error {
return ent.Unmarshal(buf) return ent.Unmarshal(buf)
} }
// createPostRequest creates a HTTP POST request that sends raft message.
func createPostRequest(u url.URL, path string, body io.Reader, ct string, from, cid types.ID) *http.Request {
uu := u
uu.Path = path
req, err := http.NewRequest("POST", uu.String(), body)
if err != nil {
plog.Panicf("unexpected new request error (%v)", err)
}
req.Header.Set("Content-Type", ct)
req.Header.Set("X-Server-From", from.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
return req
}
// checkPostResponse checks the response of the HTTP POST request that sends
// raft message.
func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
switch strings.TrimSuffix(string(body), "\n") {
case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
return errClusterIDMismatch
default:
return fmt.Errorf("unhandled error %q when precondition failed", string(body))
}
case http.StatusForbidden:
return errMemberRemoved
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
}
}
// reportErr reports the given error through sending it into
// the given error channel.
// If the error channel is filled up when sending error, it drops the error
// because the fact that error has happened is reported, which is
// good enough.
func reportCriticalError(err error, errc chan<- error) {
select {
case errc <- err:
default:
}
}
// compareMajorMinorVersion returns an integer comparing two versions based on // compareMajorMinorVersion returns an integer comparing two versions based on
// their major and minor version. The result will be 0 if a==b, -1 if a < b, // their major and minor version. The result will be 0 if a==b, -1 if a < b,
// and 1 if a > b. // and 1 if a > b.