bump(github.com/coreos/go-raft): e129a0807cdcbd53440c1d722907043d06c16f1b

This commit is contained in:
Brandon Philips
2013-08-06 12:48:59 -07:00
parent 9115a7e422
commit 253765d81e
13 changed files with 49 additions and 32 deletions

View File

@ -1,7 +1,7 @@
all: test all: test
coverage: coverage:
gocov test github.com/benbjohnson/go-raft | gocov-html > coverage.html gocov test github.com/coreos/go-raft | gocov-html > coverage.html
open coverage.html open coverage.html
dependencies: dependencies:

View File

@ -1,4 +1,5 @@
[![Stories in Ready](http://badge.waffle.io/benbjohnson/go-raft.png)](http://waffle.io/benbjohnson/go-raft) [![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft)
go-raft go-raft
======= =======

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -5,7 +5,7 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"errors" "errors"
"fmt" "fmt"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"os" "os"
"sync" "sync"
@ -141,9 +141,6 @@ func (l *Log) currentTerm() uint64 {
// Opens the log file and reads existing entries. The log can remain open and // Opens the log file and reads existing entries. The log can remain open and
// continue to append entries to the end of the log. // continue to append entries to the end of the log.
func (l *Log) open(path string) error { func (l *Log) open(path string) error {
l.mutex.Lock()
defer l.mutex.Unlock()
// Read all the entries from the log if one exists. // Read all the entries from the log if one exists.
var readBytes int64 var readBytes int64
@ -168,7 +165,6 @@ func (l *Log) open(path string) error {
// Read the file and decode entries. // Read the file and decode entries.
for { for {
// Instantiate log entry and decode into it. // Instantiate log entry and decode into it.
entry, _ := newLogEntry(l, 0, 0, nil) entry, _ := newLogEntry(l, 0, 0, nil)
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR) entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)
@ -192,6 +188,9 @@ func (l *Log) open(path string) error {
readBytes += int64(n) readBytes += int64(n)
} }
l.results = make([]*logResult, len(l.entries)) l.results = make([]*logResult, len(l.entries))
l.compact(l.startIndex, l.startTerm)
debugln("open.log.recovery number of log ", len(l.entries)) debugln("open.log.recovery number of log ", len(l.entries))
return nil return nil
} }
@ -282,9 +281,9 @@ func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) {
if entry == nil { if entry == nil {
panic("raft: Log entry required for error retrieval") panic("raft: Log entry required for error retrieval")
} }
debugln("getEntryResult.result index: ", entry.Index-l.startIndex-1)
// If a result exists for the entry then return it with its error. // If a result exists for the entry then return it with its error.
if entry.Index > l.startIndex && entry.Index <= uint64(len(l.results)) { if entry.Index > l.startIndex && entry.Index <= l.startIndex+uint64(len(l.results)) {
if result := l.results[entry.Index-l.startIndex-1]; result != nil { if result := l.results[entry.Index-l.startIndex-1]; result != nil {
// keep the records before remove it // keep the records before remove it
@ -310,8 +309,7 @@ func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) {
func (l *Log) commitInfo() (index uint64, term uint64) { func (l *Log) commitInfo() (index uint64, term uint64) {
l.mutex.RLock() l.mutex.RLock()
defer l.mutex.RUnlock() defer l.mutex.RUnlock()
// If we don't have any committed entries then just return zeros.
// If we don't have any entries then just return zeros.
if l.commitIndex == 0 { if l.commitIndex == 0 {
return 0, 0 return 0, 0
} }
@ -322,6 +320,7 @@ func (l *Log) commitInfo() (index uint64, term uint64) {
} }
// Return the last index & term from the last committed entry. // Return the last index & term from the last committed entry.
debugln("commitInfo.get.[", l.commitIndex, "/", l.startIndex, "]")
entry := l.entries[l.commitIndex-1-l.startIndex] entry := l.entries[l.commitIndex-1-l.startIndex]
return entry.Index, entry.Term return entry.Index, entry.Term
} }
@ -395,6 +394,7 @@ func (l *Log) setCommitIndex(index uint64) error {
// Apply the changes to the state machine and store the error code. // Apply the changes to the state machine and store the error code.
returnValue, err := l.ApplyFunc(command) returnValue, err := l.ApplyFunc(command)
debugln("setCommitIndex.set.result index: ", entryIndex)
l.results[entryIndex] = &logResult{returnValue: returnValue, err: err} l.results[entryIndex] = &logResult{returnValue: returnValue, err: err}
} }
return nil return nil
@ -555,22 +555,27 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
// Log compaction // Log compaction
//-------------------------------------- //--------------------------------------
// compaction the log before index // compact the log before index (including index)
func (l *Log) compact(index uint64, term uint64) error { func (l *Log) compact(index uint64, term uint64) error {
var entries []*LogEntry var entries []*LogEntry
var results []*logResult
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
if index == 0 {
return nil
}
// nothing to compaction // nothing to compaction
// the index may be greater than the current index if // the index may be greater than the current index if
// we just recovery from on snapshot // we just recovery from on snapshot
if index >= l.internalCurrentIndex() { if index >= l.internalCurrentIndex() {
entries = make([]*LogEntry, 0) entries = make([]*LogEntry, 0)
results = make([]*logResult, 0)
} else { } else {
// get all log entries after index // get all log entries after index
entries = l.entries[index-l.startIndex:] entries = l.entries[index-l.startIndex:]
results = l.results[index-l.startIndex:]
} }
// create a new log file and add all the entries // create a new log file and add all the entries
@ -604,6 +609,7 @@ func (l *Log) compact(index uint64, term uint64) error {
// compaction the in memory log // compaction the in memory log
l.entries = entries l.entries = entries
l.results = results
l.startIndex = index l.startIndex = index
l.startTerm = term l.startTerm = term
return nil return nil

View File

@ -5,7 +5,7 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -242,7 +242,7 @@ func (s *Server) LastCommandName() string {
func (s *Server) GetState() string { func (s *Server) GetState() string {
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() defer s.mutex.RUnlock()
return fmt.Sprintf("Name: %s, State: %s, Term: %v, Index: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex) return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
} }
// Check if the server is promotable // Check if the server is promotable
@ -361,6 +361,8 @@ func (s *Server) Start() error {
s.debugln("start from previous saved state") s.debugln("start from previous saved state")
} }
debugln(s.GetState())
go s.loop() go s.loop()
return nil return nil
@ -385,6 +387,8 @@ func (s *Server) readConf() error {
return err return err
} }
peerNames := make([]string, 0)
for { for {
var peerName string var peerName string
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName) _, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
@ -392,16 +396,20 @@ func (s *Server) readConf() error {
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
s.debugln("server.peer.conf: finish") s.debugln("server.peer.conf: finish")
return nil break
} }
return err return err
} }
s.debugln("server.peer.conf.read: ", peerName) s.debugln("server.peer.conf.read: ", peerName)
peer := newPeer(s, peerName, s.heartbeatTimeout) peerNames = append(peerNames, peerName)
}
s.peers[peer.name] = peer s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for _, peerName := range peerNames {
s.AddPeer(peerName)
} }
return nil return nil
@ -961,11 +969,14 @@ func (s *Server) AddPeer(name string) error {
// Only add the peer if it doesn't have the same name. // Only add the peer if it doesn't have the same name.
if s.name != name { if s.name != name {
// when loading snapshot s.confFile should be nil
if s.confFile != nil {
_, err := fmt.Fprintln(s.confFile, name) _, err := fmt.Fprintln(s.confFile, name)
s.debugln("server.peer.conf.write: ", name) s.debugln("server.peer.conf.write: ", name)
if err != nil { if err != nil {
return err return err
} }
}
peer := newPeer(s, name, s.heartbeatTimeout) peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader { if s.State() == Leader {
peer.startHeartbeat() peer.startHeartbeat()
@ -1019,7 +1030,6 @@ func (s *Server) Snapshot() {
for { for {
// TODO: change this... to something reasonable // TODO: change this... to something reasonable
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
s.takeSnapshot() s.takeSnapshot()
} }
} }
@ -1033,7 +1043,7 @@ func (s *Server) takeSnapshot() error {
lastIndex, lastTerm := s.log.commitInfo() lastIndex, lastTerm := s.log.commitInfo()
if lastIndex == 0 || lastTerm == 0 { if lastIndex == 0 {
return errors.New("No logs") return errors.New("No logs")
} }

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )

View File

@ -2,7 +2,7 @@ package raft
import ( import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/benbjohnson/go-raft/protobuf" "github.com/coreos/go-raft/protobuf"
"io" "io"
"io/ioutil" "io/ioutil"
) )