Merge upstream
This commit is contained in:
@ -31,7 +31,7 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6
|
||||
|
||||
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
|
||||
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
|
||||
|
||||
protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries))
|
||||
|
||||
@ -63,7 +63,7 @@ func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) {
|
||||
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -10,7 +10,7 @@ func BenchmarkAppendEntriesRequestEncoding(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var buf bytes.Buffer
|
||||
req.encode(&buf)
|
||||
req.Encode(&buf)
|
||||
}
|
||||
b.SetBytes(int64(len(tmp)))
|
||||
}
|
||||
@ -19,7 +19,7 @@ func BenchmarkAppendEntriesRequestDecoding(b *testing.B) {
|
||||
req, buf := createTestAppendEntriesRequest(2000)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
req.decode(bytes.NewReader(buf))
|
||||
req.Decode(bytes.NewReader(buf))
|
||||
}
|
||||
b.SetBytes(int64(len(buf)))
|
||||
}
|
||||
@ -34,7 +34,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by
|
||||
req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries)
|
||||
|
||||
var buf bytes.Buffer
|
||||
req.encode(&buf)
|
||||
req.Encode(&buf)
|
||||
|
||||
return req, buf.Bytes()
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ func newAppendEntriesResponse(term uint64, success bool, index uint64, commitInd
|
||||
|
||||
// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) {
|
||||
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoAppendEntriesResponse{
|
||||
Term: proto.Uint64(resp.Term),
|
||||
Index: proto.Uint64(resp.Index),
|
||||
@ -47,7 +47,7 @@ func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (resp *AppendEntriesResponse) decode(r io.Reader) (int, error) {
|
||||
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -10,7 +10,7 @@ func BenchmarkAppendEntriesResponseEncoding(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var buf bytes.Buffer
|
||||
req.encode(&buf)
|
||||
req.Encode(&buf)
|
||||
}
|
||||
b.SetBytes(int64(len(tmp)))
|
||||
}
|
||||
@ -19,7 +19,7 @@ func BenchmarkAppendEntriesResponseDecoding(b *testing.B) {
|
||||
req, buf := createTestAppendEntriesResponse(2000)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
req.decode(bytes.NewReader(buf))
|
||||
req.Decode(bytes.NewReader(buf))
|
||||
}
|
||||
b.SetBytes(int64(len(buf)))
|
||||
}
|
||||
@ -28,7 +28,7 @@ func createTestAppendEntriesResponse(entryCount int) (*AppendEntriesResponse, []
|
||||
resp := newAppendEntriesResponse(1, true, 1, 1)
|
||||
|
||||
var buf bytes.Buffer
|
||||
resp.encode(&buf)
|
||||
resp.Encode(&buf)
|
||||
|
||||
return resp, buf.Bytes()
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
|
||||
// Sends an AppendEntries RPC to a peer.
|
||||
func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
|
||||
var b bytes.Buffer
|
||||
if _, err := req.encode(&b); err != nil {
|
||||
if _, err := req.Encode(&b); err != nil {
|
||||
traceln("transporter.ae.encoding.error:", err)
|
||||
return nil
|
||||
}
|
||||
@ -106,7 +106,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
|
||||
defer httpResp.Body.Close()
|
||||
|
||||
resp := &AppendEntriesResponse{}
|
||||
if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
|
||||
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
|
||||
traceln("transporter.ae.decoding.error:", err)
|
||||
return nil
|
||||
}
|
||||
@ -117,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
|
||||
// Sends a RequestVote RPC to a peer.
|
||||
func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
|
||||
var b bytes.Buffer
|
||||
if _, err := req.encode(&b); err != nil {
|
||||
if _, err := req.Encode(&b); err != nil {
|
||||
traceln("transporter.rv.encoding.error:", err)
|
||||
return nil
|
||||
}
|
||||
@ -134,7 +134,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
|
||||
defer httpResp.Body.Close()
|
||||
|
||||
resp := &RequestVoteResponse{}
|
||||
if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
|
||||
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
|
||||
traceln("transporter.rv.decoding.error:", err)
|
||||
return nil
|
||||
}
|
||||
@ -162,13 +162,13 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc
|
||||
traceln(server.Name(), "RECV /appendEntries")
|
||||
|
||||
req := &AppendEntriesRequest{}
|
||||
if _, err := req.decode(r.Body); err != nil {
|
||||
if _, err := req.Decode(r.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
resp := server.AppendEntries(req)
|
||||
if _, err := resp.encode(w); err != nil {
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
@ -181,13 +181,13 @@ func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
|
||||
traceln(server.Name(), "RECV /requestVote")
|
||||
|
||||
req := &RequestVoteRequest{}
|
||||
if _, err := req.decode(r.Body); err != nil {
|
||||
if _, err := req.Decode(r.Body); err != nil {
|
||||
http.Error(w, "", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
resp := server.RequestVote(req)
|
||||
if _, err := resp.encode(w); err != nil {
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
30
third_party/github.com/coreos/go-raft/log.go
vendored
30
third_party/github.com/coreos/go-raft/log.go
vendored
@ -180,26 +180,23 @@ func (l *Log) open(path string) error {
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Append entry.
|
||||
l.entries = append(l.entries, entry)
|
||||
|
||||
if entry.Index <= l.commitIndex {
|
||||
command, err := newCommand(entry.CommandName, entry.Command)
|
||||
if err != nil {
|
||||
continue
|
||||
if entry.Index > l.startIndex {
|
||||
// Append entry.
|
||||
l.entries = append(l.entries, entry)
|
||||
if entry.Index <= l.commitIndex {
|
||||
command, err := newCommand(entry.CommandName, entry.Command)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
l.ApplyFunc(command)
|
||||
}
|
||||
l.ApplyFunc(command)
|
||||
debugln("open.log.append log index ", entry.Index)
|
||||
}
|
||||
|
||||
debugln("open.log.append log index ", entry.Index)
|
||||
|
||||
readBytes += int64(n)
|
||||
}
|
||||
l.results = make([]*logResult, len(l.entries))
|
||||
|
||||
l.compact(l.startIndex, l.startTerm)
|
||||
|
||||
debugln("open.log.recovery number of log ", len(l.entries))
|
||||
return nil
|
||||
}
|
||||
@ -273,6 +270,8 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
|
||||
entries := l.entries[index-l.startIndex:]
|
||||
length := len(entries)
|
||||
|
||||
traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries))
|
||||
|
||||
if uint64(length) < maxLogEntriesPerRequest {
|
||||
// Determine the term at the given entry and return a subslice.
|
||||
return entries, l.entries[index-1-l.startIndex].Term
|
||||
@ -353,7 +352,10 @@ func (l *Log) lastInfo() (index uint64, term uint64) {
|
||||
func (l *Log) updateCommitIndex(index uint64) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
l.commitIndex = index
|
||||
if index > l.commitIndex {
|
||||
l.commitIndex = index
|
||||
}
|
||||
debugln("update.commit.index ", index)
|
||||
}
|
||||
|
||||
// Updates the commit index and writes entries after that index to the stable storage.
|
||||
|
@ -255,6 +255,12 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
|
||||
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
|
||||
debugln("peer.snap.recovery.send: ", p.Name)
|
||||
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
|
||||
|
||||
if resp == nil {
|
||||
debugln("peer.snap.recovery.timeout: ", p.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.Success {
|
||||
p.prevLogIndex = req.LastIndex
|
||||
} else {
|
||||
|
@ -28,7 +28,7 @@ func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6
|
||||
|
||||
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *RequestVoteRequest) encode(w io.Writer) (int, error) {
|
||||
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoRequestVoteRequest{
|
||||
Term: proto.Uint64(req.Term),
|
||||
LastLogIndex: proto.Uint64(req.LastLogIndex),
|
||||
@ -45,7 +45,7 @@ func (req *RequestVoteRequest) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *RequestVoteRequest) decode(r io.Reader) (int, error) {
|
||||
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -24,7 +24,7 @@ func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse
|
||||
|
||||
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) {
|
||||
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoRequestVoteResponse{
|
||||
Term: proto.Uint64(resp.Term),
|
||||
VoteGranted: proto.Bool(resp.VoteGranted),
|
||||
@ -40,7 +40,7 @@ func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (resp *RequestVoteResponse) decode(r io.Reader) (int, error) {
|
||||
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
18
third_party/github.com/coreos/go-raft/server.go
vendored
18
third_party/github.com/coreos/go-raft/server.go
vendored
@ -80,6 +80,8 @@ type Server struct {
|
||||
lastSnapshot *Snapshot
|
||||
stateMachine StateMachine
|
||||
maxLogEntriesPerRequest uint64
|
||||
|
||||
connectionString string
|
||||
}
|
||||
|
||||
// An event to be processed by the server's event loop.
|
||||
@ -96,7 +98,7 @@ type event struct {
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Creates a new server with a log at the given path.
|
||||
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
|
||||
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
|
||||
if name == "" {
|
||||
return nil, errors.New("raft.Server: Name cannot be blank")
|
||||
}
|
||||
@ -117,6 +119,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
||||
electionTimeout: DefaultElectionTimeout,
|
||||
heartbeatTimeout: DefaultHeartbeatTimeout,
|
||||
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
|
||||
connectionString: connectionString,
|
||||
}
|
||||
|
||||
// Setup apply function.
|
||||
@ -1009,10 +1012,17 @@ func (s *Server) TakeSnapshot() error {
|
||||
state = []byte{0}
|
||||
}
|
||||
|
||||
var peers []*Peer
|
||||
peers := make([]*Peer, len(s.peers)+1)
|
||||
|
||||
i := 0
|
||||
for _, peer := range s.peers {
|
||||
peers = append(peers, peer.clone())
|
||||
peers[i] = peer.clone()
|
||||
i++
|
||||
}
|
||||
|
||||
peers[i] = &Peer{
|
||||
Name: s.Name(),
|
||||
ConnectionString: s.connectionString,
|
||||
}
|
||||
|
||||
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
|
||||
@ -1253,7 +1263,7 @@ func (s *Server) readConf() error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.log.commitIndex = conf.CommitIndex
|
||||
s.log.updateCommitIndex(conf.CommitIndex)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -428,7 +428,7 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
|
||||
for _, name := range names {
|
||||
server := servers[name]
|
||||
if server.CommitIndex() != 17 {
|
||||
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
|
||||
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17)
|
||||
}
|
||||
server.Stop()
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
|
||||
|
||||
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
|
||||
func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
|
||||
|
||||
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
|
||||
|
||||
@ -63,7 +63,7 @@ func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
|
||||
func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -31,7 +31,7 @@ func newSnapshotRecoveryResponse(term uint64, success bool, commitIndex uint64)
|
||||
|
||||
// Encodes the SnapshotRecoveryResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) {
|
||||
func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoSnapshotRecoveryResponse{
|
||||
Term: proto.Uint64(req.Term),
|
||||
Success: proto.Bool(req.Success),
|
||||
@ -47,7 +47,7 @@ func (req *SnapshotRecoveryResponse) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the SnapshotRecoveryResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *SnapshotRecoveryResponse) decode(r io.Reader) (int, error) {
|
||||
func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -31,7 +31,7 @@ func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest
|
||||
|
||||
// Encodes the SnapshotRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *SnapshotRequest) encode(w io.Writer) (int, error) {
|
||||
func (req *SnapshotRequest) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoSnapshotRequest{
|
||||
LeaderName: proto.String(req.LeaderName),
|
||||
LastIndex: proto.Uint64(req.LastIndex),
|
||||
@ -47,7 +47,7 @@ func (req *SnapshotRequest) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *SnapshotRequest) decode(r io.Reader) (int, error) {
|
||||
func (req *SnapshotRequest) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -27,7 +27,7 @@ func newSnapshotResponse(success bool) *SnapshotResponse {
|
||||
|
||||
// Encodes the SnapshotResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (resp *SnapshotResponse) encode(w io.Writer) (int, error) {
|
||||
func (resp *SnapshotResponse) Encode(w io.Writer) (int, error) {
|
||||
pb := &protobuf.ProtoSnapshotResponse{
|
||||
Success: proto.Bool(resp.Success),
|
||||
}
|
||||
@ -41,7 +41,7 @@ func (resp *SnapshotResponse) encode(w io.Writer) (int, error) {
|
||||
|
||||
// Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (resp *SnapshotResponse) decode(r io.Reader) (int, error) {
|
||||
func (resp *SnapshotResponse) Decode(r io.Reader) (int, error) {
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
|
@ -65,12 +65,12 @@ func newTestServer(name string, transporter Transporter) *Server {
|
||||
if err := os.MkdirAll(p, 0644); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
server, _ := NewServer(name, p, transporter, nil, nil)
|
||||
server, _ := NewServer(name, p, transporter, nil, nil, "")
|
||||
return server
|
||||
}
|
||||
|
||||
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
|
||||
server, _ := NewServer(name, p, transporter, nil, nil)
|
||||
server, _ := NewServer(name, p, transporter, nil, nil, "")
|
||||
return server
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user