Skip to content

Commit

Permalink
Merge pull request #1 from xiangli-cmu/server
Browse files Browse the repository at this point in the history
Server
  • Loading branch information
xiang90 committed Jul 7, 2013
2 parents 7a19090 + 0dd663e commit 85bedaa
Show file tree
Hide file tree
Showing 16 changed files with 695 additions and 591 deletions.
4 changes: 2 additions & 2 deletions append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AppendEntriesResponse struct {
//------------------------------------------------------------------------------

// Creates a new AppendEntries request.
func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest {
func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest {
return &AppendEntriesRequest{
Term: term,
LeaderName: leaderName,
Expand All @@ -42,7 +42,7 @@ func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64
}

// Creates a new AppendEntries response.
func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
return &AppendEntriesResponse{
Term: term,
Success: success,
Expand Down
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Command interface {
//--------------------------------------

// Creates a new instance of a command by name.
func NewCommand(name string) (Command, error) {
func newCommand(name string) (Command, error) {
// Find the registered command.
command := commandTypes[name]
if command == nil {
Expand Down
86 changes: 33 additions & 53 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type Log struct {
//------------------------------------------------------------------------------

// Creates a new log.
func NewLog() *Log {
return &Log{}
func newLog() *Log {
return &Log{
entries: make([]*LogEntry, 0),
}
}

//------------------------------------------------------------------------------
Expand All @@ -45,24 +47,12 @@ func NewLog() *Log {
//
//------------------------------------------------------------------------------

func (l *Log) SetStartIndex(i uint64) {
l.startIndex = i
}

func (l *Log) StartIndex() uint64 {
return l.startIndex
}

func (l *Log) SetStartTerm(t uint64) {
l.startTerm = t
}

//--------------------------------------
// Log Indices
//--------------------------------------

// The current index in the log.
func (l *Log) CurrentIndex() uint64 {
func (l *Log) currentIndex() uint64 {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -81,24 +71,19 @@ func (l *Log) internalCurrentIndex() uint64 {
}

// The next index in the log.
func (l *Log) NextIndex() uint64 {
return l.CurrentIndex() + 1
}

// The last committed index in the log.
func (l *Log) CommitIndex() uint64 {
return l.commitIndex
func (l *Log) nextIndex() uint64 {
return l.currentIndex() + 1
}

// Determines if the log contains zero entries.
func (l *Log) IsEmpty() bool {
func (l *Log) isEmpty() bool {
l.mutex.Lock()
defer l.mutex.Unlock()
return (len(l.entries) == 0) && (l.startIndex == 0)
}

// The name of the last command in the log.
func (l *Log) LastCommandName() string {
func (l *Log) lastCommandName() string {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(l.entries) > 0 {
Expand All @@ -114,7 +99,7 @@ func (l *Log) LastCommandName() string {
//--------------------------------------

// The current term in the log.
func (l *Log) CurrentTerm() uint64 {
func (l *Log) currentTerm() uint64 {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -136,7 +121,7 @@ func (l *Log) CurrentTerm() uint64 {

// Opens the log file and reads existing entries. The log can remain open and
// continue to append entries to the end of the log.
func (l *Log) Open(path string) error {
func (l *Log) open(path string) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -158,8 +143,8 @@ func (l *Log) Open(path string) error {
}

// Instantiate log entry and decode into it.
entry := NewLogEntry(l, 0, 0, nil)
n, err := entry.Decode(reader)
entry := newLogEntry(l, 0, 0, nil)
n, err := entry.decode(reader)
if err != nil {
file.Close()
if err = os.Truncate(path, int64(lastIndex)); err != nil {
Expand Down Expand Up @@ -194,7 +179,7 @@ func (l *Log) Open(path string) error {
}

// Closes the log file.
func (l *Log) Close() {
func (l *Log) close() {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -211,12 +196,12 @@ func (l *Log) Close() {
//--------------------------------------

// Creates a log entry associated with this log.
func (l *Log) CreateEntry(term uint64, command Command) *LogEntry {
return NewLogEntry(l, l.NextIndex(), term, command)
func (l *Log) createEntry(term uint64, command Command) *LogEntry {
return newLogEntry(l, l.nextIndex(), term, command)
}

// Checks if the log contains a given index/term combination.
func (l *Log) ContainsEntry(index uint64, term uint64) bool {
func (l *Log) containsEntry(index uint64, term uint64) bool {
if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) {
return false
}
Expand All @@ -226,12 +211,13 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
// Retrieves a list of entries after a given index as well as the term of the
// index provided. A nil list of entries is returned if the index no longer
// exists because a snapshot was made.
func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

// Return nil if index is before the start of the log.
if index < l.startIndex {
debugln("[GetEntries] index < startIndex ", index, " ", l.startIndex)
return nil, 0
}

Expand All @@ -242,6 +228,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {

// If we're going from the beginning of the log then return the whole log.
if index == l.startIndex {
debugln("[GetEntries] index = startIndex ", index, " ", l.startIndex)
return l.entries, l.startTerm
}

Expand All @@ -255,7 +242,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {

// Retrieves the error returned from an entry. The error can only exist after
// the entry has been committed.
func (l *Log) GetEntryError(entry *LogEntry) error {
func (l *Log) getEntryError(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -274,7 +261,7 @@ func (l *Log) GetEntryError(entry *LogEntry) error {
//--------------------------------------

// Retrieves the last index and term that has been committed to the log.
func (l *Log) CommitInfo() (index uint64, term uint64) {
func (l *Log) commitInfo() (index uint64, term uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -294,7 +281,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) {
}

// Retrieves the last index and term that has been committed to the log.
func (l *Log) LastInfo() (index uint64, term uint64) {
func (l *Log) lastInfo() (index uint64, term uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -309,14 +296,14 @@ func (l *Log) LastInfo() (index uint64, term uint64) {
}

// Updates the commit index
func (l *Log) UpdateCommitIndex(index uint64) {
func (l *Log) updateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.commitIndex = index
}

// Updates the commit index and writes entries after that index to the stable storage.
func (l *Log) SetCommitIndex(index uint64) error {
func (l *Log) setCommitIndex(index uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand Down Expand Up @@ -349,7 +336,7 @@ func (l *Log) SetCommitIndex(index uint64) error {
entry := l.entries[entryIndex]

// Write to storage.
if err := entry.Encode(l.file); err != nil {
if err := entry.encode(l.file); err != nil {
return err
}

Expand All @@ -369,14 +356,14 @@ func (l *Log) SetCommitIndex(index uint64) error {

// Truncates the log to the given index and term. This only works if the log
// at the index has not been committed.
func (l *Log) Truncate(index uint64, term uint64) error {
func (l *Log) truncate(index uint64, term uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()
debugln("[Truncate] truncate to ", index)
// Do not allow committed entries to be truncated.
if index < l.CommitIndex() {
if index < l.commitIndex {
debugln("[Truncate] error 1")
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term)
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term)
}

// Do not truncate past end of entries.
Expand Down Expand Up @@ -411,8 +398,8 @@ func (l *Log) Truncate(index uint64, term uint64) error {
//--------------------------------------

// Appends a series of entries to the log. These entries are not written to
// disk until SetCommitIndex() is called.
func (l *Log) AppendEntries(entries []*LogEntry) error {
// disk until setCommitIndex() is called.
func (l *Log) appendEntries(entries []*LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -426,13 +413,6 @@ func (l *Log) AppendEntries(entries []*LogEntry) error {
return nil
}

// Appends a single entry to the log.
func (l *Log) AppendEntry(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.appendEntry(entry)
}

// Writes a single log entry to the end of the log. This function does not
// obtain a lock and should only be used internally. Use AppendEntries() and
// AppendEntry() to use it externally.
Expand Down Expand Up @@ -463,7 +443,7 @@ func (l *Log) appendEntry(entry *LogEntry) error {
//--------------------------------------

// compaction the log before index
func (l *Log) Compact(index uint64, term uint64) error {
func (l *Log) compact(index uint64, term uint64) error {
var entries []*LogEntry

l.mutex.Lock()
Expand All @@ -486,7 +466,7 @@ func (l *Log) Compact(index uint64, term uint64) error {
return err
}
for _, entry := range entries {
err = entry.Encode(file)
err = entry.encode(file)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type logEntryRawMessage struct {
//------------------------------------------------------------------------------

// Creates a new log entry associated with a log.
func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry {
func newLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry {
return &LogEntry{
log: log,
Index: index,
Expand All @@ -62,7 +62,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry
//--------------------------------------

// Encodes the log entry to a buffer.
func (e *LogEntry) Encode(w io.Writer) error {
func (e *LogEntry) encode(w io.Writer) error {
if w == nil {
return errors.New("raft.LogEntry: Writer required to encode")
}
Expand All @@ -87,7 +87,7 @@ func (e *LogEntry) Encode(w io.Writer) error {
}

// Decodes the log entry from a buffer. Returns the number of bytes read.
func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
func (e *LogEntry) decode(r io.Reader) (pos int, err error) {
pos = 0

if r == nil {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
}

// Instantiate command by name.
command, err := NewCommand(commandName)
command, err := newCommand(commandName)
if err != nil {
err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
return
Expand Down Expand Up @@ -187,7 +187,7 @@ func (e *LogEntry) UnmarshalJSON(data []byte) error {

// Create a command based on the name.
var err error
if e.Command, err = NewCommand(obj.Name); err != nil {
if e.Command, err = newCommand(obj.Name); err != nil {
return err
}
json.Unmarshal(obj.Command, e.Command)
Expand Down
4 changes: 2 additions & 2 deletions log_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Ensure that we can encode a log entry to JSON.
func TestLogEntryMarshal(t *testing.T) {
e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
e := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) {
t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err)
}
Expand All @@ -32,6 +32,6 @@ func TestLogEntryUnmarshal(t *testing.T) {
t.Fatalf("Log entry unmarshalling error: %v", err)
}
if !(e.Index == 1 && e.Term == 2 && reflect.DeepEqual(e.Command, &joinCommand{Name: "localhost:1000"})) {
t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}))
t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}))
}
}
Loading

0 comments on commit 85bedaa

Please sign in to comment.