diff --git a/append_entries.go b/append_entries.go index fef37cd6166..096ea8d3cab 100644 --- a/append_entries.go +++ b/append_entries.go @@ -30,7 +30,7 @@ type AppendEntriesResponse struct { //------------------------------------------------------------------------------ // Creates a new AppendEntries request. -func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest { +func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest { return &AppendEntriesRequest{ Term: term, LeaderName: leaderName, @@ -42,7 +42,7 @@ func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64 } // Creates a new AppendEntries response. -func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse { +func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse { return &AppendEntriesResponse{ Term: term, Success: success, diff --git a/command.go b/command.go index 66bb18241ed..ceb7172c768 100644 --- a/command.go +++ b/command.go @@ -40,7 +40,7 @@ type Command interface { //-------------------------------------- // Creates a new instance of a command by name. -func NewCommand(name string) (Command, error) { +func newCommand(name string) (Command, error) { // Find the registered command. command := commandTypes[name] if command == nil { diff --git a/log.go b/log.go index 567684723fe..3c387f04f52 100644 --- a/log.go +++ b/log.go @@ -35,8 +35,10 @@ type Log struct { //------------------------------------------------------------------------------ // Creates a new log. -func NewLog() *Log { - return &Log{} +func newLog() *Log { + return &Log{ + entries: make([]*LogEntry, 0), + } } //------------------------------------------------------------------------------ @@ -45,24 +47,12 @@ func NewLog() *Log { // //------------------------------------------------------------------------------ -func (l *Log) SetStartIndex(i uint64) { - l.startIndex = i -} - -func (l *Log) StartIndex() uint64 { - return l.startIndex -} - -func (l *Log) SetStartTerm(t uint64) { - l.startTerm = t -} - //-------------------------------------- // Log Indices //-------------------------------------- // The current index in the log. -func (l *Log) CurrentIndex() uint64 { +func (l *Log) currentIndex() uint64 { l.mutex.Lock() defer l.mutex.Unlock() @@ -81,24 +71,19 @@ func (l *Log) internalCurrentIndex() uint64 { } // The next index in the log. -func (l *Log) NextIndex() uint64 { - return l.CurrentIndex() + 1 -} - -// The last committed index in the log. -func (l *Log) CommitIndex() uint64 { - return l.commitIndex +func (l *Log) nextIndex() uint64 { + return l.currentIndex() + 1 } // Determines if the log contains zero entries. -func (l *Log) IsEmpty() bool { +func (l *Log) isEmpty() bool { l.mutex.Lock() defer l.mutex.Unlock() return (len(l.entries) == 0) && (l.startIndex == 0) } // The name of the last command in the log. -func (l *Log) LastCommandName() string { +func (l *Log) lastCommandName() string { l.mutex.Lock() defer l.mutex.Unlock() if len(l.entries) > 0 { @@ -114,7 +99,7 @@ func (l *Log) LastCommandName() string { //-------------------------------------- // The current term in the log. -func (l *Log) CurrentTerm() uint64 { +func (l *Log) currentTerm() uint64 { l.mutex.Lock() defer l.mutex.Unlock() @@ -136,7 +121,7 @@ func (l *Log) CurrentTerm() uint64 { // Opens the log file and reads existing entries. The log can remain open and // continue to append entries to the end of the log. -func (l *Log) Open(path string) error { +func (l *Log) open(path string) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -158,8 +143,8 @@ func (l *Log) Open(path string) error { } // Instantiate log entry and decode into it. - entry := NewLogEntry(l, 0, 0, nil) - n, err := entry.Decode(reader) + entry := newLogEntry(l, 0, 0, nil) + n, err := entry.decode(reader) if err != nil { file.Close() if err = os.Truncate(path, int64(lastIndex)); err != nil { @@ -194,7 +179,7 @@ func (l *Log) Open(path string) error { } // Closes the log file. -func (l *Log) Close() { +func (l *Log) close() { l.mutex.Lock() defer l.mutex.Unlock() @@ -211,12 +196,12 @@ func (l *Log) Close() { //-------------------------------------- // Creates a log entry associated with this log. -func (l *Log) CreateEntry(term uint64, command Command) *LogEntry { - return NewLogEntry(l, l.NextIndex(), term, command) +func (l *Log) createEntry(term uint64, command Command) *LogEntry { + return newLogEntry(l, l.nextIndex(), term, command) } // Checks if the log contains a given index/term combination. -func (l *Log) ContainsEntry(index uint64, term uint64) bool { +func (l *Log) containsEntry(index uint64, term uint64) bool { if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) { return false } @@ -226,12 +211,13 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool { // Retrieves a list of entries after a given index as well as the term of the // index provided. A nil list of entries is returned if the index no longer // exists because a snapshot was made. -func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { +func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) { l.mutex.Lock() defer l.mutex.Unlock() // Return nil if index is before the start of the log. if index < l.startIndex { + debugln("[GetEntries] index < startIndex ", index, " ", l.startIndex) return nil, 0 } @@ -242,6 +228,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { // If we're going from the beginning of the log then return the whole log. if index == l.startIndex { + debugln("[GetEntries] index = startIndex ", index, " ", l.startIndex) return l.entries, l.startTerm } @@ -255,7 +242,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { // Retrieves the error returned from an entry. The error can only exist after // the entry has been committed. -func (l *Log) GetEntryError(entry *LogEntry) error { +func (l *Log) getEntryError(entry *LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -274,7 +261,7 @@ func (l *Log) GetEntryError(entry *LogEntry) error { //-------------------------------------- // Retrieves the last index and term that has been committed to the log. -func (l *Log) CommitInfo() (index uint64, term uint64) { +func (l *Log) commitInfo() (index uint64, term uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -294,7 +281,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) { } // Retrieves the last index and term that has been committed to the log. -func (l *Log) LastInfo() (index uint64, term uint64) { +func (l *Log) lastInfo() (index uint64, term uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -309,14 +296,14 @@ func (l *Log) LastInfo() (index uint64, term uint64) { } // Updates the commit index -func (l *Log) UpdateCommitIndex(index uint64) { +func (l *Log) updateCommitIndex(index uint64) { l.mutex.Lock() defer l.mutex.Unlock() l.commitIndex = index } // Updates the commit index and writes entries after that index to the stable storage. -func (l *Log) SetCommitIndex(index uint64) error { +func (l *Log) setCommitIndex(index uint64) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -349,7 +336,7 @@ func (l *Log) SetCommitIndex(index uint64) error { entry := l.entries[entryIndex] // Write to storage. - if err := entry.Encode(l.file); err != nil { + if err := entry.encode(l.file); err != nil { return err } @@ -369,14 +356,14 @@ func (l *Log) SetCommitIndex(index uint64) error { // Truncates the log to the given index and term. This only works if the log // at the index has not been committed. -func (l *Log) Truncate(index uint64, term uint64) error { +func (l *Log) truncate(index uint64, term uint64) error { l.mutex.Lock() defer l.mutex.Unlock() debugln("[Truncate] truncate to ", index) // Do not allow committed entries to be truncated. - if index < l.CommitIndex() { + if index < l.commitIndex { debugln("[Truncate] error 1") - return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term) + return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term) } // Do not truncate past end of entries. @@ -411,8 +398,8 @@ func (l *Log) Truncate(index uint64, term uint64) error { //-------------------------------------- // Appends a series of entries to the log. These entries are not written to -// disk until SetCommitIndex() is called. -func (l *Log) AppendEntries(entries []*LogEntry) error { +// disk until setCommitIndex() is called. +func (l *Log) appendEntries(entries []*LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -426,13 +413,6 @@ func (l *Log) AppendEntries(entries []*LogEntry) error { return nil } -// Appends a single entry to the log. -func (l *Log) AppendEntry(entry *LogEntry) error { - l.mutex.Lock() - defer l.mutex.Unlock() - return l.appendEntry(entry) -} - // Writes a single log entry to the end of the log. This function does not // obtain a lock and should only be used internally. Use AppendEntries() and // AppendEntry() to use it externally. @@ -463,7 +443,7 @@ func (l *Log) appendEntry(entry *LogEntry) error { //-------------------------------------- // compaction the log before index -func (l *Log) Compact(index uint64, term uint64) error { +func (l *Log) compact(index uint64, term uint64) error { var entries []*LogEntry l.mutex.Lock() @@ -486,7 +466,7 @@ func (l *Log) Compact(index uint64, term uint64) error { return err } for _, entry := range entries { - err = entry.Encode(file) + err = entry.encode(file) if err != nil { return err } diff --git a/log_entry.go b/log_entry.go index 50f0ed5e60f..d9a15e94b4a 100644 --- a/log_entry.go +++ b/log_entry.go @@ -41,7 +41,7 @@ type logEntryRawMessage struct { //------------------------------------------------------------------------------ // Creates a new log entry associated with a log. -func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry { +func newLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry { return &LogEntry{ log: log, Index: index, @@ -62,7 +62,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry //-------------------------------------- // Encodes the log entry to a buffer. -func (e *LogEntry) Encode(w io.Writer) error { +func (e *LogEntry) encode(w io.Writer) error { if w == nil { return errors.New("raft.LogEntry: Writer required to encode") } @@ -87,7 +87,7 @@ func (e *LogEntry) Encode(w io.Writer) error { } // Decodes the log entry from a buffer. Returns the number of bytes read. -func (e *LogEntry) Decode(r io.Reader) (pos int, err error) { +func (e *LogEntry) decode(r io.Reader) (pos int, err error) { pos = 0 if r == nil { @@ -137,7 +137,7 @@ func (e *LogEntry) Decode(r io.Reader) (pos int, err error) { } // Instantiate command by name. - command, err := NewCommand(commandName) + command, err := newCommand(commandName) if err != nil { err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err) return @@ -187,7 +187,7 @@ func (e *LogEntry) UnmarshalJSON(data []byte) error { // Create a command based on the name. var err error - if e.Command, err = NewCommand(obj.Name); err != nil { + if e.Command, err = newCommand(obj.Name); err != nil { return err } json.Unmarshal(obj.Command, e.Command) diff --git a/log_entry_test.go b/log_entry_test.go index b7a6032750f..19765dbd323 100644 --- a/log_entry_test.go +++ b/log_entry_test.go @@ -18,7 +18,7 @@ import ( // Ensure that we can encode a log entry to JSON. func TestLogEntryMarshal(t *testing.T) { - e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}) + e := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}) if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) { t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err) } @@ -32,6 +32,6 @@ func TestLogEntryUnmarshal(t *testing.T) { t.Fatalf("Log entry unmarshalling error: %v", err) } if !(e.Index == 1 && e.Term == 2 && reflect.DeepEqual(e.Command, &joinCommand{Name: "localhost:1000"})) { - t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) + t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) } } diff --git a/log_test.go b/log_test.go index b26544c5154..dc107ec03e4 100644 --- a/log_test.go +++ b/log_test.go @@ -20,28 +20,28 @@ import ( // Ensure that we can append to a new log. func TestLogNewLog(t *testing.T) { path := getLogPath() - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - if err := log.AppendEntry(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil { + if err := log.appendEntry(newLogEntry(log, 1, 1, &testCommand1{"foo", 20})); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.AppendEntry(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil { + if err := log.appendEntry(newLogEntry(log, 2, 1, &testCommand2{100})); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil { + if err := log.appendEntry(newLogEntry(log, 3, 2, &testCommand1{"bar", 0})); err != nil { t.Fatalf("Unable to append: %v", err) } // Partial commit. - if err := log.SetCommitIndex(2); err != nil { + if err := log.setCommitIndex(2); err != nil { t.Fatalf("Unable to partially commit: %v", err) } expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -50,12 +50,12 @@ func TestLogNewLog(t *testing.T) { if string(actual) != expected { t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) } - if index, term := log.CommitInfo(); index != 2 || term != 1 { + if index, term := log.commitInfo(); index != 2 || term != 1 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Full commit. - if err := log.SetCommitIndex(3); err != nil { + if err := log.setCommitIndex(3); err != nil { t.Fatalf("Unable to commit: %v", err) } expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -65,7 +65,7 @@ func TestLogNewLog(t *testing.T) { if string(actual) != expected { t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) } - if index, term := log.CommitInfo(); index != 3 || term != 2 { + if index, term := log.commitInfo(); index != 3 || term != 2 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -75,20 +75,20 @@ func TestLogExistingLog(t *testing.T) { log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") - defer log.Close() + defer log.close() defer os.Remove(path) // Validate existing log entries. if len(log.entries) != 3 { t.Fatalf("Expected 3 entries, got %d", len(log.entries)) } - if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &TestCommand1{"foo", 20}) { + if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &testCommand1{"foo", 20}) { t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) } - if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &TestCommand2{100}) { + if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &testCommand2{100}) { t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) } - if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &TestCommand1{"bar", 0}) { + if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &testCommand1{"bar", 0}) { t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) } } @@ -98,22 +98,22 @@ func TestLogContainsEntries(t *testing.T) { log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") - defer log.Close() + defer log.close() defer os.Remove(path) - if log.ContainsEntry(0, 0) { + if log.containsEntry(0, 0) { t.Fatalf("Zero-index entry should not exist in log.") } - if log.ContainsEntry(1, 0) { + if log.containsEntry(1, 0) { t.Fatalf("Entry with mismatched term should not exist") } - if log.ContainsEntry(4, 0) { + if log.containsEntry(4, 0) { t.Fatalf("Out-of-range entry should not exist") } - if !log.ContainsEntry(2, 1) { + if !log.containsEntry(2, 1) { t.Fatalf("Entry 2/1 should exist") } - if !log.ContainsEntry(3, 2) { + if !log.containsEntry(3, 2) { t.Fatalf("Entry 2/1 should exist") } } @@ -123,17 +123,17 @@ func TestLogRecovery(t *testing.T) { path := setupLogFile(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 00000000000`) - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})); err != nil { + if err := log.appendEntry(newLogEntry(log, 3, 2, &testCommand1{"bat", -5})); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -141,13 +141,13 @@ func TestLogRecovery(t *testing.T) { if len(log.entries) != 3 { t.Fatalf("Expected 2 entries, got %d", len(log.entries)) } - if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &TestCommand1{"foo", 20}) { + if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &testCommand1{"foo", 20}) { t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) } - if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &TestCommand2{100}) { + if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &testCommand2{100}) { t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) } - if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &TestCommand1{"bat", -5}) { + if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &testCommand1{"bat", -5}) { t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) } @@ -160,7 +160,7 @@ func TestLogRecovery(t *testing.T) { } // Validate committed log contents. - if err := log.SetCommitIndex(3); err != nil { + if err := log.setCommitIndex(3); err != nil { t.Fatalf("Unable to partially commit: %v", err) } expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -179,46 +179,46 @@ func TestLogRecovery(t *testing.T) { // Ensure that we can truncate uncommitted entries in the log. func TestLogTruncate(t *testing.T) { log, path := setupLog("") - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - entry1 := NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20}) - if err := log.AppendEntry(entry1); err != nil { + entry1 := newLogEntry(log, 1, 1, &testCommand1{"foo", 20}) + if err := log.appendEntry(entry1); err != nil { t.Fatalf("Unable to append: %v", err) } - entry2 := NewLogEntry(log, 2, 1, &TestCommand2{100}) - if err := log.AppendEntry(entry2); err != nil { + entry2 := newLogEntry(log, 2, 1, &testCommand2{100}) + if err := log.appendEntry(entry2); err != nil { t.Fatalf("Unable to append: %v", err) } - entry3 := NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0}) - if err := log.AppendEntry(entry3); err != nil { + entry3 := newLogEntry(log, 3, 2, &testCommand1{"bar", 0}) + if err := log.appendEntry(entry3); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.SetCommitIndex(2); err != nil { + if err := log.setCommitIndex(2); err != nil { t.Fatalf("Unable to partially commit: %v", err) } // Truncate committed entry. - if err := log.Truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" { + if err := log.truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" { t.Fatalf("Truncating committed entries shouldn't work: %v", err) } // Truncate past end of log. - if err := log.Truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" { + if err := log.truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" { t.Fatalf("Truncating past end-of-log shouldn't work: %v", err) } // Truncate entry with mismatched term. - if err := log.Truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" { + if err := log.truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" { t.Fatalf("Truncating mismatched entries shouldn't work: %v", err) } // Truncate end of log. - if err := log.Truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) { + if err := log.truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) { t.Fatalf("Truncating end of log should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2, entry3}) } // Truncate at last commit. - if err := log.Truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) { + if err := log.truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) { t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2}) } diff --git a/peer.go b/peer.go index 7e1618874f1..cbb597dfc36 100644 --- a/peer.go +++ b/peer.go @@ -2,6 +2,7 @@ package raft import ( "errors" + "fmt" "sync" "time" ) @@ -18,10 +19,10 @@ type Peer struct { name string prevLogIndex uint64 mutex sync.Mutex - heartbeatTimer *Timer + heartbeatTimer *timer } -type FlushResponse struct { +type flushResponse struct { term uint64 success bool err error @@ -35,11 +36,11 @@ type FlushResponse struct { //------------------------------------------------------------------------------ // Creates a new peer. -func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { +func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { p := &Peer{ server: server, name: name, - heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout), + heartbeatTimer: newTimer(heartbeatTimeout, heartbeatTimeout), } return p @@ -56,17 +57,12 @@ func (p *Peer) Name() string { return p.name } -// Retrieves the heartbeat timeout. -func (p *Peer) HeartbeatTimeout() time.Duration { - return p.heartbeatTimer.MinDuration() -} - // Sets the heartbeat timeout. -func (p *Peer) SetHeartbeatTimeout(duration time.Duration) { - p.heartbeatTimer.SetDuration(duration) +func (p *Peer) setHeartbeatTimeout(duration time.Duration) { + p.heartbeatTimer.setDuration(duration) } -func (p *Peer) StartHeartbeat() { +func (p *Peer) startHeartbeat() { go p.heartbeat() } @@ -84,7 +80,7 @@ func (p *Peer) StartHeartbeat() { func (p *Peer) stop() { p.mutex.Lock() defer p.mutex.Unlock() - p.heartbeatTimer.Stop() + p.heartbeatTimer.stop() } //-------------------------------------- @@ -123,7 +119,7 @@ func (p *Peer) flush() (uint64, bool, error) { } // send VoteRequest Request -func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse){ +func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) { req.peer = p debugln(p.server.Name(), "Send Vote Request to ", p.Name()) if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil { @@ -174,17 +170,29 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) //debugln("flush to ", p.Name()) debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now()) - if p.server.State() != Leader { - return 0, false, errors.New("Not leader anymore") - } + respChan := make(chan *AppendEntriesResponse, 2) + + go func() { + tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req) + respChan <- tranResp + }() - resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req) + var resp *AppendEntriesResponse - //debugln("receive flush response from ", p.Name()) + select { + // how to decide? + case <-time.After(p.server.heartbeatTimeout * 2): + resp = nil + + case resp = <-respChan: + + } if resp == nil { - return 0, false, err + debugln("receive flush timeout from ", p.Name()) + return 0, false, fmt.Errorf("AppendEntries timeout: %s", p.Name()) } + debugln("receive flush response from ", p.Name()) // If successful then update the previous log index. If it was // unsuccessful then decrement the previous log index and we'll try again @@ -192,31 +200,28 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) if resp.Success { if len(req.Entries) > 0 { p.prevLogIndex = req.Entries[len(req.Entries)-1].Index - debugln("Peer ", p.Name(), "'s' log update to ", p.prevLogIndex) } + debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex) } else { - if p.server.State() != Leader { - return 0, false, errors.New("Not leader anymore") - } - if resp.Term > p.server.currentTerm { return resp.Term, false, errors.New("Step down") } - // Decrement the previous log index down until we find a match. Don't - // let it go below where the peer's commit index is though. That's a - // problem. - if p.prevLogIndex > 0 { + + // we may miss a response from peer + if resp.CommitIndex >= p.prevLogIndex { + debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex) + p.prevLogIndex = resp.CommitIndex + } else if p.prevLogIndex > 0 { + debugln("Peer ", p.Name(), "'s' step back to ", p.prevLogIndex) + // Decrement the previous log index down until we find a match. Don't + // let it go below where the peer's commit index is though. That's a + // problem. p.prevLogIndex-- } - if resp.CommitIndex > p.prevLogIndex { - debugln("%v %v %v %v", resp.CommitIndex, p.prevLogIndex, - p.server.currentTerm, resp.Term) - panic("commitedIndex is greater than prevLogIndex") - } - } - return resp.Term, resp.Success, err + } + return resp.Term, resp.Success, nil } //-------------------------------------- @@ -230,9 +235,9 @@ func (p *Peer) heartbeat() { // (1) timeout/fire happens, flush the peer // (2) stopped, return - if p.heartbeatTimer.Start() { + if p.heartbeatTimer.start() { - var f FlushResponse + var f flushResponse f.peer = p @@ -241,7 +246,7 @@ func (p *Peer) heartbeat() { // if the peer successfully appended the log entry // we will tell the commit center if f.success { - if p.prevLogIndex > p.server.log.CommitIndex() { + if p.prevLogIndex > p.server.log.commitIndex { debugln("[Heartbeat] Peer", p.Name(), "send to commit center") p.server.response <- f debugln("[Heartbeat] Peer", p.Name(), "back from commit center") @@ -250,13 +255,20 @@ func (p *Peer) heartbeat() { } else { // shutdown the heartbeat if f.term > p.server.currentTerm { - debugln("[Heartbeat] SetpDown!") - select { - case p.server.stepDown <- f.term: - return - default: - return + p.server.stateMutex.Lock() + + if p.server.state == Leader { + p.server.state = Follower + select { + case p.server.stepDown <- f.term: + p.server.currentTerm = f.term + default: + panic("heartbeat cannot step down") + } } + + p.server.stateMutex.Unlock() + return } } diff --git a/request_vote.go b/request_vote.go index 2fd1f791e99..e198ffb4ae5 100644 --- a/request_vote.go +++ b/request_vote.go @@ -29,7 +29,7 @@ type RequestVoteResponse struct { //------------------------------------------------------------------------------ // Creates a new RequestVote request. -func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest { +func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest { return &RequestVoteRequest{ Term: term, CandidateName: candidateName, @@ -39,7 +39,7 @@ func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6 } // Creates a new RequestVote response. -func NewRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse { +func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse { return &RequestVoteResponse{ Term: term, VoteGranted: voteGranted, diff --git a/server.go b/server.go index 1667b460b1c..17edbc496e7 100644 --- a/server.go +++ b/server.go @@ -56,16 +56,18 @@ type Server struct { context interface{} currentTerm uint64 - votedFor string - log *Log - leader string - peers map[string]*Peer - mutex sync.Mutex - - electionTimer *Timer + votedFor string + log *Log + leader string + peers map[string]*Peer + mutex sync.Mutex + stateMutex sync.Mutex + + electionTimer *timer heartbeatTimeout time.Duration - response chan FlushResponse + response chan flushResponse stepDown chan uint64 + stop chan bool currentSnapshot *Snapshot lastSnapshot *Snapshot @@ -95,9 +97,10 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S context: context, state: Stopped, peers: make(map[string]*Peer), - log: NewLog(), - stepDown: make(chan uint64), - electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2), + log: newLog(), + stepDown: make(chan uint64, 1), + stop: make(chan bool), + electionTimer: newTimer(DefaultElectionTimeout, DefaultElectionTimeout*2), heartbeatTimeout: DefaultHeartbeatTimeout, } @@ -179,9 +182,7 @@ func (s *Server) Term() uint64 { // Retrieves the current committed index of the server. func (s *Server) CommittedIndex() uint64 { - - return s.log.CommitIndex() - + return s.log.commitIndex } // Retrieves the name of the candidate this server voted for in this term. @@ -193,7 +194,7 @@ func (s *Server) VotedFor() string { func (s *Server) IsLogEmpty() bool { s.mutex.Lock() defer s.mutex.Unlock() - return s.log.IsEmpty() + return s.log.isEmpty() } // A list of all the log entries. This should only be used for debugging purposes. @@ -207,18 +208,18 @@ func (s *Server) LogEntries() []*LogEntry { } // A reference to the command name of the last entry. -func (s *Server) LastCommandName() string { +func (s *Server) lastCommandName() string { s.mutex.Lock() defer s.mutex.Unlock() if s.log != nil { - return s.log.LastCommandName() + return s.log.lastCommandName() } return "" } // Get the state of the server for debugging func (s *Server) GetState() string { - return fmt.Sprintf("State: %s, Term: %v, Index: %v ", s.state, s.currentTerm, s.CommittedIndex()) + return fmt.Sprintf("Name: %s, State: %s, Term: %v, Index: %v ", s.name, s.state, s.currentTerm, s.CommittedIndex()) } //-------------------------------------- @@ -243,22 +244,19 @@ func (s *Server) QuorumSize() int { // Retrieves the election timeout. func (s *Server) ElectionTimeout() time.Duration { - return s.electionTimer.MinDuration() + return s.electionTimer.minDuration } // Sets the election timeout. func (s *Server) SetElectionTimeout(duration time.Duration) { - s.electionTimer.SetMinDuration(duration) - s.electionTimer.SetMaxDuration(duration * 2) -} - -func (s *Server) StartElectionTimeout() { - go s.electionTimeout() + s.electionTimer.minDuration = duration + s.electionTimer.maxDuration = duration * 2 } -func (s *Server) StartHeartbeatTimeout() { +// Start heartbeat when the server promote to leader +func (s *Server) startHeartbeatTimeout() { for _, peer := range s.peers { - peer.StartHeartbeat() + peer.startHeartbeat() } } @@ -272,13 +270,13 @@ func (s *Server) HeartbeatTimeout() time.Duration { } // Sets the heartbeat timeout. -func (s *Server) SetHeartbeatTimeout(duration time.Duration) { +func (s *Server) setHeartbeatTimeout(duration time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() s.heartbeatTimeout = duration for _, peer := range s.peers { - peer.SetHeartbeatTimeout(duration) + peer.setHeartbeatTimeout(duration) } } @@ -303,84 +301,347 @@ func (s *Server) Initialize() error { } // Initialize response channel - s.response = make(chan FlushResponse, 128) + s.response = make(chan flushResponse, 128) // Create snapshot directory if not exist os.Mkdir(s.path+"/snapshot", 0700) // Initialize the log and load it up. - if err := s.log.Open(s.LogPath()); err != nil { + if err := s.log.open(s.LogPath()); err != nil { debugln("log error") s.unload() return fmt.Errorf("raft.Server: %v", err) } // Update the term to the last term in the log. - s.currentTerm = s.log.CurrentTerm() + s.currentTerm = s.log.currentTerm() return nil } +// timeout +// ______ +// | | +// | | +// v | recv majority votes +// -------- timeout ----------- ----------- +// |Follower| ----------> | Candidate |--------------------> | Leader | +// -------- ----------- ----------- +// ^ stepDown | stepDown | +// |_______________________|____________________________________ | +// +// The main Loop for the server +func (s *Server) StartServerLoop(role string) { + stop := false + leader := false + + defer debugln("server stopped!") + + for { + switch role { + + case Follower: + stop = s.startFollowerLoop() + + if stop { + return + } + + role = Candidate + + case Candidate: + debugln(s.GetState() + "start Candiate") + stop, leader = s.startCandidateLoop() + + s.votedFor = "" + + if stop { + return + } + + if leader { + role = Leader + + } else { + + role = Follower + } + debugln(s.GetState() + "stop Candiate") + + case Leader: + debugln(s.GetState() + "start Leader") + stop = s.startLeaderLoop() + if stop { + return + } + + role = Follower + debugln(s.GetState() + "stop Leader") + } + } +} + // Start the sever as a follower func (s *Server) StartFollower() { - // Update the state. - s.state = Follower - - // Start the election timeout. - s.StartElectionTimeout() + go s.StartServerLoop(Follower) } // Start the sever as a leader -func (s *Server) StartLeader() error { +func (s *Server) StartLeader() { + s.state = Candidate + s.currentTerm++ + go s.StartServerLoop(Leader) +} + +// Shuts down the server. +func (s *Server) Stop() { s.mutex.Lock() - defer s.mutex.Unlock() + if s.state == Follower { + s.electionTimer.stop() + } else { + s.mutex.Unlock() + s.stop <- true + } + s.unload() +} - // Start as leader. - s.currentTerm++ - s.state = Leader - s.leader = s.name +// Unloads the server. +func (s *Server) unload() { + // Kill the election timer. + s.state = Stopped - // Leader need to collect appendLog response - go s.commitCenter() + // wait for all previous flush ends + time.Sleep(100 * time.Millisecond) + + // Close the log. + if s.log != nil { + // still some concurrency issue with stop + // need lock + s.log.close() + } - return nil } +// Checks if the server is currently running. +func (s *Server) Running() bool { + return s.state != Stopped +} -func (s *Server) collectVotes(c chan *RequestVoteResponse) (bool, bool) { +// Respond to RPCs from candidates and leaders. +// Convert to candidate if election timeout elapses without +// either: +// 1.Receiving valid AppendEntries RPC, or +// 2.Granting vote to candidate +func (s *Server) startFollowerLoop() (stop bool) { + s.state = Follower + + // (1) Timeout: promote and return + // (2) Stopped: due to receive heartbeat, continue + for { + if s.State() == Stopped { + return true + } + + if s.electionTimer.start() { + return false + + } else { + s.electionTimer.ready() + continue + } + } +} + +// Increment currentTerm, vote for self +// Reset election timeout +// Send RequestVote RPCs to all other servers, wait for either: +// Votes received from majority of servers: become leader +// AppendEntries RPC received from new leader: step +// down +// Election timeout elapses without election resolution: +// increment term, start new election +// Discover higher term: step down + +func (s *Server) startCandidateLoop() (stop bool, leader bool) { + + // the server must be a follower + if s.state != Follower && s.state != Stopped { + panic("startCandidateLoop") + } + + s.state = Candidate + s.leader = "" + s.votedFor = s.Name() + + lastLogIndex, lastLogTerm := s.log.lastInfo() + + for { + + // increase term + s.currentTerm++ + + // Request votes from each of our peers. + c := make(chan *RequestVoteResponse, len(s.peers)) + req := newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm) + + for _, peer := range s.peers { + go peer.sendVoteRequest(req, c) + } + + // collectVotes + elected, timeout, stop := s.startCandidateSelect(c) + + // If we received enough votes then promote to leader and stop this election. + if elected { + return false, true + } + + if timeout { + debugln(s.Name(), " election timeout, restart") + // restart promotion + continue + } + + if stop { + + return true, false + } + + return false, false + } +} + +// Initialize nextIndex for each to last log index + 1 +// Send initial empty AppendEntries RPCs (heartbeat) to each +// follower; repeat during idle periods to prevent election +// timeouts (§5.2) +// Accept commands from clients, append new entries to local +// log (§5.3) +// Whenever last log index ! nextIndex for a follower, send +// AppendEntries RPC with log entries starting at nextIndex, +// update nextIndex if successful (§5.3) +// If AppendEntries fails because of log inconsistency, +// decrement nextIndex and retry (§5.3) +// Mark entries committed if stored on a majority of servers +// and some entry from current term is stored on a majority of +// servers. Apply newly committed entries to state machine. +// Step down if currentTerm changes (§5.5) +func (s *Server) startLeaderLoop() bool { + + // when the server goes into this loop, + // the leader may have been stepped down to follower! + + // we cannot assume the the server is a candidate when + // get into this func + + // The request vote func may let it step down + + // That happens when we receive the majority votes, but + // another candidate start a new term and has not vote for us + // after it send vote request, the leader will stepdown before + // it enter this func + + // Move server to become a leader and begin peer heartbeats. + s.stateMutex.Lock() + + if s.state == Candidate { + s.state = Leader + s.leader = s.name + } else { + + s.stateMutex.Unlock() + return false + } + + s.stateMutex.Unlock() + + logIndex, _ := s.log.lastInfo() + + // after here we let the leader stepdown in the startLeaderSelect loop + + // Update the peers prevLogIndex to leader's lastLogIndex + // Start heartbeat + for _, peer := range s.peers { + + debugln("[Leader] Set ", peer.Name(), "Prev to", logIndex) + + peer.prevLogIndex = logIndex + peer.heartbeatTimer.ready() + peer.startHeartbeat() + } + + // Begin to collect response from followers + stop := s.startLeaderSelect() + + { + for _, peer := range s.peers { + peer.stop() + } + } + + return stop +} + +// Votes received from majority of servers: become leader +// Election timeout elapses without election resolution: +// Discover higher term: step down +func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool, bool) { // Collect votes until we have a quorum. votesGranted := 1 + debugln(s.GetState() + "start Select") + defer debugln(s.GetState() + "end Select") + for { // If we received enough votes then stop waiting for more votes. if votesGranted >= s.QuorumSize() { - return true, false + return true, false, false } // Collect votes from peers. select { case resp := <-c: + debugln(s.GetState() + "select recv vote") if resp != nil { if resp.VoteGranted == true { votesGranted++ + } else if resp.Term > s.currentTerm { - // Step down if we discover a higher term. - s.mutex.Lock() + s.stateMutex.Lock() + + // go from internal path + // we may need to eat the stepdown + select { + case <-s.stepDown: + + default: - s.setCurrentTerm(resp.Term) + } - s.mutex.Unlock() - return false, false + s.state = Follower + s.currentTerm = resp.Term + debugln(s.GetState() + "select step down") + s.stateMutex.Unlock() + return false, false, false } } + case <-s.stepDown: + debugln(s.GetState() + "select step down") + return false, false, false + // TODO: do we calculate the overall timeout? or timeout for each vote? - // Some issue here + // Some issue here case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2): - return false, true + debugln(s.GetState() + "select timeout") + return false, true, false + + case <-s.stop: + debugln(s.GetState() + "select stop") + return false, false, true } } @@ -389,13 +650,11 @@ func (s *Server) collectVotes(c chan *RequestVoteResponse) (bool, bool) { // Collect response from followers. If more than the // majority of the followers append a log entry, the // leader will commit the log entry -func (s *Server) commitCenter() { - debugln("collecting data") - +func (s *Server) startLeaderSelect() bool { count := 1 for { - var response FlushResponse + var response flushResponse select { case response = <-s.response: @@ -405,13 +664,13 @@ func (s *Server) commitCenter() { count++ } - case term := <-s.stepDown: - s.mutex.Lock() + case <-s.stepDown: + // stepdown to follower - s.setCurrentTerm(term) + return false - s.mutex.Unlock() - return + case <-s.stop: + return true } if response.peer != nil { @@ -423,21 +682,21 @@ func (s *Server) commitCenter() { if count >= s.QuorumSize() { // Determine the committed index that a majority has. var indices []uint64 - indices = append(indices, s.log.CurrentIndex()) + indices = append(indices, s.log.currentIndex()) for _, peer := range s.peers { indices = append(indices, peer.prevLogIndex) } - sort.Sort(Uint64Slice(indices)) + sort.Sort(uint64Slice(indices)) // We can commit upto the index which the mojarity // of the members have appended. commitIndex := indices[s.QuorumSize()-1] - committedIndex := s.log.CommitIndex() + committedIndex := s.log.commitIndex if commitIndex > committedIndex { debugln(indices) debugln(s.GetState(), "[CommitCenter] Going to Commit ", commitIndex) - s.log.SetCommitIndex(commitIndex) + s.log.setCommitIndex(commitIndex) debugln("[CommitCenter] Commit ", commitIndex) for i := committedIndex; i < commitIndex; i++ { @@ -460,44 +719,6 @@ func (s *Server) commitCenter() { } -// Shuts down the server. -func (s *Server) Stop() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.unload() -} - -// Unloads the server. -func (s *Server) unload() { - // Kill the election timer. - s.state = Stopped - - if s.electionTimer != nil { - s.electionTimer.Stop() - } - - // Remove peers. - for _, peer := range s.peers { - peer.stop() - } - // wait for all previous flush ends - time.Sleep(100 * time.Millisecond) - - s.peers = make(map[string]*Peer) - - // Close the log. - if s.log != nil { - s.log.Close() - s.log = nil - } - -} - -// Checks if the server is currently running. -func (s *Server) Running() bool { - return s.state != Stopped -} - //-------------------------------------- // Commands //-------------------------------------- @@ -506,16 +727,34 @@ func (s *Server) Running() bool { // when the command has been successfully committed or an error has occurred. func (s *Server) Do(command Command) (interface{}, error) { + // race here + // chance to append entry when we are not leader + // after the check, the leader may stepdown + // but log appended with the newest term + // which means the command from this follower can be commited + // which will cause a panic + + s.stateMutex.Lock() if s.state != Leader { + + s.stateMutex.Unlock() + return nil, NotLeaderError } - entry := s.log.CreateEntry(s.currentTerm, command) - if err := s.log.AppendEntry(entry); err != nil { + // we get the term of the server + // when we are sure the server is leader + term := s.currentTerm + + s.stateMutex.Unlock() + + entry := s.log.createEntry(term, command) + + if err := s.log.appendEntry(entry); err != nil { return nil, err } - s.response <- FlushResponse{s.currentTerm, true, nil, nil} + s.response <- flushResponse{term, true, nil, nil} // to speed up the response time // TODO: think about this carefully @@ -523,7 +762,7 @@ func (s *Server) Do(command Command) (interface{}, error) { // but will reduce through output // for _, peer := range s.peers { - // peer.heartbeatTimer.Fire() + // peer.heartbeatTimer.fire() // } debugln("[Do] join!") @@ -547,13 +786,13 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons defer s.mutex.Unlock() // If the server is stopped then reject it. if !s.Running() { - return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped") + return newAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped") } // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term") + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), fmt.Errorf("raft.Server: Stale request term") } debugln("Peer ", s.Name(), "received heartbeat from ", req.LeaderName, @@ -566,33 +805,33 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons // Reset election timeout. if s.electionTimer != nil { - s.electionTimer.Stop() + s.electionTimer.stop() } // Reject if log doesn't contain a matching previous entry. - if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "after truncate ") // Append entries to the log. - if err := s.log.AppendEntries(req.Entries); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.appendEntries(req.Entries); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "commit index ", req.CommitIndex, " from ", req.LeaderName) // Commit up to the commit index. - if err := s.log.SetCommitIndex(req.CommitIndex); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.setCommitIndex(req.CommitIndex); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "after commit ") debugln("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName, " ", req.Term, " ", s.currentTerm, " ", time.Now()) - return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil + return newAppendEntriesResponse(s.currentTerm, true, s.log.commitIndex), nil } // Creates an AppendEntries request. Can return a nil request object if the @@ -601,143 +840,14 @@ func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesR if s.log == nil { return nil } - entries, prevLogTerm := s.log.GetEntriesAfter(prevLogIndex) + entries, prevLogTerm := s.log.getEntriesAfter(prevLogIndex) if entries != nil { - return NewAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.CommitIndex()) + return newAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.commitIndex) } else { return nil } } -//-------------------------------------- -// Promotion -//-------------------------------------- - -// Promotes the server to a candidate and then requests votes from peers. If -// enough votes are received then the server becomes the leader. If this -// server is elected then true is returned. If another server is elected then -// false is returned. -func (s *Server) promote() (bool, error) { - - for { - // Start a new election. - term, lastLogIndex, lastLogTerm, err := s.promoteToCandidate() - if err != nil { - return false, err - } - - // Request votes from each of our peers. - c := make(chan *RequestVoteResponse, len(s.peers)) - req := NewRequestVoteRequest(term, s.name, lastLogIndex, lastLogTerm) - - for _, peer := range s.peers { - go peer.sendVoteRequest(req, c) - } - - elected, timeout := s.collectVotes(c) - - // If we received enough votes then promote to leader and stop this election. - if elected { - if s.promoteToLeader(term, lastLogIndex, lastLogTerm) { - debugln(s.Name(), " became leader") - return true, nil - } - } - - if timeout { - debugln(s.Name(), " election timeout") - // restart promotion - continue - } - - return false, fmt.Errorf("raft.Server: Term changed during election, stepping down: (%v > %v)", s.currentTerm, term) - - // TODO: is this still needed? - // If we are no longer in the same term then another server must have been elected. - s.mutex.Lock() - if s.currentTerm != term { - s.mutex.Unlock() - return false, fmt.Errorf("raft.Server: Term changed during election, stepping down: (%v > %v)", s.currentTerm, term) - } - s.mutex.Unlock() - } - - // for complier - return true, nil -} - -// Promotes the server to a candidate and increases the election term. The -// term and log state are returned for use in the RPCs. -func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - // Ignore promotion if the server is not a follower. - if s.state != Follower && s.state != Candidate { - panic("promote but not a follower") - return 0, 0, 0, fmt.Errorf("raft: Invalid promotion state: %s", s.state) - } - - // Move server to become a candidate, increase our term & vote for ourself. - s.state = Candidate - s.currentTerm++ - s.votedFor = s.name - s.leader = "" - - // Pause the election timer while we're a candidate. - - // Return server state so we can check for it during leader promotion. - lastLogIndex, lastLogTerm := s.log.LastInfo() - - debugln("[PromoteToCandidate] Follower ", s.Name(), - "promote to candidate[", lastLogIndex, ",", lastLogTerm, "]", - "currentTerm ", s.currentTerm) - - return s.currentTerm, lastLogIndex, lastLogTerm, nil -} - -// Promotes the server from a candidate to a leader. This can only occur if -// the server is in the state that it assumed when the candidate election -// began. This is because another server may have won the election and caused -// the state to change. -func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm uint64) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - - // Ignore promotion if we are not a candidate. - if s.state != Candidate { - panic(s.Name() + " promote to leader but not candidate " + s.state) - } - - // TODO: should panic or just a false? - - // Disallow promotion if the term or log does not match what we currently have. - logIndex, logTerm := s.log.LastInfo() - if s.currentTerm != term || logIndex != lastLogIndex || logTerm != lastLogTerm { - return false - } - - // Move server to become a leader and begin peer heartbeats. - s.state = Leader - s.leader = s.name - - // Begin to collect response from followers - go s.commitCenter() - - // Update the peers prevLogIndex to leader's lastLogIndex - // Start heartbeat - for _, peer := range s.peers { - - debugln("[Leader] Set ", peer.Name(), "Prev to", lastLogIndex) - - peer.prevLogIndex = lastLogIndex - peer.heartbeatTimer.Ready() - peer.StartHeartbeat() - } - - return true -} - //-------------------------------------- // Request Vote //-------------------------------------- @@ -753,26 +863,28 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err //debugln("[RequestVote] got the lock") // Fail if the server is not running. if !s.Running() { - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped") + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped") } // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm) + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm) } + s.setCurrentTerm(req.Term) // If we've already voted for a different candidate then don't vote for this candidate. if s.votedFor != "" && s.votedFor != req.CandidateName { debugln("already vote for ", s.votedFor, " false to ", req.CandidateName) - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor) + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor) } // If the candidate's log is not at least as up-to-date as // our last log then don't vote. - lastIndex, lastTerm := s.log.LastInfo() + lastIndex, lastTerm := s.log.lastInfo() if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm) + debugln("my log is more up-to-date") + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm) } // If we made it this far then cast a vote and reset our election time out. @@ -781,10 +893,10 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err debugln(s.Name(), "Vote for ", req.CandidateName, "at term", req.Term) if s.electionTimer != nil { - s.electionTimer.Stop() + s.electionTimer.stop() } - return NewRequestVoteResponse(s.currentTerm, true), nil + return newRequestVoteResponse(s.currentTerm, true), nil } // Updates the current term on the server if the term is greater than the @@ -794,57 +906,30 @@ func (s *Server) setCurrentTerm(term uint64) { if term > s.currentTerm { s.votedFor = "" - if s.state == Leader { - debugln(s.Name(), " step down to a follower") + s.stateMutex.Lock() + if s.state == Leader || s.state == Candidate { + debugln(s.Name(), " should step down to a follower from ", s.state) - // stop heartbeats - for _, peer := range s.peers { - peer.stop() - } + s.state = Follower select { case s.stepDown <- term: default: - + panic("cannot stepdown") } - - s.StartElectionTimeout() - - // candidate should also start timeout - } else if s.state == Candidate { - s.StartElectionTimeout() + debugln(s.Name(), " step down to a follower from ", s.state) + s.currentTerm = term + s.stateMutex.Unlock() + return } - s.state = Follower - + s.stateMutex.Unlock() // update term after stop all the peer s.currentTerm = term } } -// Listens to the election timeout and kicks off a new election. -func (s *Server) electionTimeout() { - - // (1) Timeout: promote and return - // (2) Stopped: due to receive heartbeat, continue - for { - if s.State() == Stopped { - return - } - - // TODO race condition with unload - if s.electionTimer.Start() { - go s.promote() - return - - } else { - s.electionTimer.Ready() - continue - } - } -} - //-------------------------------------- // Membership //-------------------------------------- @@ -861,13 +946,14 @@ func (s *Server) AddPeer(name string) error { // Only add the peer if it doesn't have the same name. if s.name != name { //debugln("Add peer ", name) - peer := NewPeer(s, name, s.heartbeatTimeout) + peer := newPeer(s, name, s.heartbeatTimeout) if s.state == Leader { - peer.StartHeartbeat() + peer.startHeartbeat() } s.peers[peer.name] = peer } + return nil } @@ -906,7 +992,7 @@ func (s *Server) RemovePeer(name string) error { func (s *Server) createSnapshotRequest() *SnapshotRequest { s.mutex.Lock() defer s.mutex.Unlock() - return NewSnapshotRequest(s.name, s.lastSnapshot) + return newSnapshotRequest(s.name, s.lastSnapshot) } // The background snapshot function @@ -926,7 +1012,7 @@ func (s *Server) takeSnapshot() error { return errors.New("handling snapshot") } - lastIndex, lastTerm := s.log.CommitInfo() + lastIndex, lastTerm := s.log.commitInfo() if lastIndex == 0 || lastTerm == 0 { return errors.New("No logs") @@ -959,7 +1045,7 @@ func (s *Server) takeSnapshot() error { s.saveSnapshot() - s.log.Compact(lastIndex, lastTerm) + s.log.compact(lastIndex, lastTerm) return nil } @@ -971,7 +1057,7 @@ func (s *Server) saveSnapshot() error { return errors.New("no snapshot to save") } - err := s.currentSnapshot.Save() + err := s.currentSnapshot.save() if err != nil { return err @@ -982,7 +1068,7 @@ func (s *Server) saveSnapshot() error { // delete the previous snapshot if there is any change if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) { - tmp.Remove() + tmp.remove() } s.currentSnapshot = nil return nil @@ -1008,7 +1094,7 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro //update term and index s.currentTerm = req.LastTerm - s.log.UpdateCommitIndex(req.LastIndex) + s.log.updateCommitIndex(req.LastIndex) snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm) @@ -1016,9 +1102,9 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro s.saveSnapshot() - s.log.Compact(req.LastIndex, req.LastTerm) + s.log.compact(req.LastIndex, req.LastTerm) - return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil + return newSnapshotResponse(req.LastTerm, true, req.LastIndex), nil } @@ -1097,9 +1183,9 @@ func (s *Server) LoadSnapshot() error { s.AddPeer(peerName) } - s.log.SetStartTerm(s.lastSnapshot.LastTerm) - s.log.SetStartIndex(s.lastSnapshot.LastIndex) - s.log.UpdateCommitIndex(s.lastSnapshot.LastIndex) + s.log.startTerm = s.lastSnapshot.LastTerm + s.log.startIndex = s.lastSnapshot.LastIndex + s.log.updateCommitIndex(s.lastSnapshot.LastIndex) return err } diff --git a/server_test.go b/server_test.go index c51c39aff2d..f621fe34a29 100644 --- a/server_test.go +++ b/server_test.go @@ -1,6 +1,7 @@ package raft import ( + "fmt" "reflect" "strconv" "sync" @@ -24,7 +25,7 @@ func TestServerRequestVote(t *testing.T) { server.Initialize() server.StartLeader() defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0)) if !(resp.Term == 1 && resp.VoteGranted && err == nil) { t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) } @@ -37,8 +38,7 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - - resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") { t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) } @@ -54,11 +54,11 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("First vote should not have been denied (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "bar", 0, 0)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") { t.Fatalf("Second vote should have been denied (%v)", err) } @@ -71,11 +71,15 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0)) if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) { t.Fatalf("First vote should not have been denied (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0)) + resp, err = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0)) + + // now stepdown is done by channel, need time + time.Sleep(5 * time.Millisecond) + if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) { t.Fatalf("Second vote should have been approved (%v)", err) } @@ -93,19 +97,19 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 2, 2)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") { t.Fatalf("Stale index vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 1)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") { t.Fatalf("Stale term vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 2)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("Matching log vote should have been granted (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 4, 3)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("Ahead-of-log vote should have been granted (%v)", err) } @@ -123,8 +127,10 @@ func TestServerPromoteSelf(t *testing.T) { server.StartFollower() defer server.Stop() - if success, err := server.promote(); !(success && err == nil && server.state == Leader) { - t.Fatalf("Server self-promotion failed: %v (%v)", server.state, err) + time.Sleep(300 * time.Millisecond) + + if server.state != Leader { + t.Fatalf("Server self-promotion failed: %v", server.state) } } @@ -146,10 +152,13 @@ func TestServerPromote(t *testing.T) { leader := servers[0] - if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) { - t.Fatalf("Server self-promotion failed: %v (%v)", leader.state, err) - } + leader.StartFollower() + time.Sleep(200 * time.Millisecond) + + if leader.state != Leader { + t.Fatalf("Server promotion failed: %v", leader.state) + } for _, server := range servers { server.Stop() } @@ -177,9 +186,8 @@ func TestServerPromoteDoubleElection(t *testing.T) { leader := servers[0] - if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) { - t.Fatalf("Server self-promotion failed: %v (%v)", leader.state, err) - } + leader.StartFollower() + time.Sleep(400 * time.Millisecond) if lookup["2"].votedFor != "1" { t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor) @@ -205,31 +213,31 @@ func TestServerAppendEntries(t *testing.T) { defer server.Stop() // Append single entry. - entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})} + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + if index, term := server.log.commitInfo(); !(index == 0 && term == 0) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Append multiple entries + commit the last one. - entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20}), NewLogEntry(nil, 3, 1, &TestCommand1{"baz", 30})} - resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) + entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20}), newLogEntry(nil, 3, 1, &testCommand1{"baz", 30})} + resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 1 && term == 1) { + if index, term := server.log.commitInfo(); !(index == 1 && term == 1) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Send zero entries and commit everything. - resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) + resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) if !(resp.Term == 2 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 3 && term == 1) { + if index, term := server.log.commitInfo(); !(index == 3 && term == 1) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -239,16 +247,17 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { server := newTestServer("1", &testTransporter{}) server.Initialize() server.StartLeader() + defer server.Stop() server.currentTerm = 2 // Append single entry. - entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})} + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) if !(resp.Term == 2 && !resp.Success && err != nil && err.Error() == "raft.Server: Stale request term") { t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + if index, term := server.log.commitInfo(); !(index == 0 && term == 0) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -258,21 +267,22 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { server := newTestServer("1", &testTransporter{}) server.Initialize() server.StartLeader() + defer server.Stop() // Append single entry + commit. entries := []*LogEntry{ - NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}), - NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}), + newLogEntry(nil, 1, 1, &testCommand1{"foo", 10}), + newLogEntry(nil, 2, 1, &testCommand1{"foo", 15}), } - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } // Append entry again (post-commit). - entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20})} - resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) + entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20})} + resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) if !(resp.Term == 1 && !resp.Success && err != nil && err.Error() == "raft.Log: Cannot append entry with earlier index in the same term (1:2 <= 1:2)") { t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) } @@ -285,21 +295,21 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { server.StartLeader() defer server.Stop() - entry1 := NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}) - entry2 := NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}) - entry3 := NewLogEntry(nil, 2, 2, &TestCommand1{"bar", 20}) + entry1 := newLogEntry(nil, 1, 1, &testCommand1{"foo", 10}) + entry2 := newLogEntry(nil, 2, 1, &testCommand1{"foo", 15}) + entry3 := newLogEntry(nil, 2, 2, &testCommand1{"bar", 20}) // Append single entry + commit. entries := []*LogEntry{entry1, entry2} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) - if !(resp.Term == 1 && resp.Success && err == nil && server.log.CommitIndex() == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) + if !(resp.Term == 1 && resp.Success && err == nil && server.log.commitIndex == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } // Append entry that overwrites the second (uncommitted) entry. entries = []*LogEntry{entry3} - resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) - if !(resp.Term == 2 && resp.Success && err == nil && server.log.CommitIndex() == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { + resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) + if !(resp.Term == 2 && resp.Success && err == nil && server.log.commitIndex == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err) } } @@ -315,7 +325,7 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { server.StartFollower() defer server.Stop() var err error - if _, err = server.Do(&TestCommand1{"foo", 10}); err != NotLeaderError { + if _, err = server.Do(&testCommand1{"foo", 10}); err != NotLeaderError { t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) } } @@ -332,14 +342,13 @@ func TestServerSingleNode(t *testing.T) { } server.Initialize() - defer server.Stop() if server.state != Stopped { t.Fatalf("Unexpected server state: %v", server.state) } server.StartLeader() - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) // Join the server to itself. if _, err := server.Do(&joinCommand{Name: "1"}); err != nil { @@ -351,8 +360,8 @@ func TestServerSingleNode(t *testing.T) { t.Fatalf("Unexpected server state: %v", server.state) } - // Stop the server. server.Stop() + if server.state != Stopped { t.Fatalf("Unexpected server state: %v", server.state) } @@ -404,12 +413,14 @@ func TestServerMultiNode(t *testing.T) { if name == "1" { leader = server - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) server.StartLeader() + time.Sleep(100 * time.Millisecond) } else { server.SetElectionTimeout(testElectionTimeout) - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) server.StartFollower() + time.Sleep(10 * time.Millisecond) } if _, err := leader.Do(&joinCommand{Name: name}); err != nil { t.Fatalf("Unable to join server[%s]: %v", name, err) @@ -428,17 +439,21 @@ func TestServerMultiNode(t *testing.T) { } mutex.Unlock() - for i := 0; i < 20; i++ { - i++ - debugln("Round ", i) + for i := 0; i < 20000000; i++ { + retry := 0 + fmt.Println("Round ", i) num := strconv.Itoa(i%(len(servers)) + 1) + num_1 := strconv.Itoa((i+3)%(len(servers)) + 1) toStop := servers[num] + toStop_1 := servers[num_1] // Stop the first server and wait for a re-election. time.Sleep(100 * time.Millisecond) debugln("Disconnect ", toStop.Name()) + debugln("disconnect ", num, " ", num_1) toStop.SetTransporter(disTransporter) + toStop_1.SetTransporter(disTransporter) time.Sleep(200 * time.Millisecond) // Check that either server 2 or 3 is the leader now. //mutex.Lock() @@ -447,31 +462,62 @@ func TestServerMultiNode(t *testing.T) { for key, value := range servers { debugln("Play begin") - if key != num { + if key != num && key != num_1 { if value.State() == Leader { debugln("Found leader") for i := 0; i < 10; i++ { debugln("[Test] do ", value.Name()) - if _, err := value.Do(&TestCommand2{X: 1}); err != nil { - t.Fatalf("Unable to do command") + if _, err := value.Do(&testCommand2{X: 1}); err != nil { + break } debugln("[Test] Done") } - - leader++ debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex) } debugln("Not Found leader") } } + for { + for key, value := range servers { + if key != num && key != num_1 { + if value.State() == Leader { + leader++ + } + debugln(value.Name(), " ", value.currentTerm, " ", value.state) + } + } - if leader != 1 { - t.Fatalf("wrong leader number %v", leader) + if leader > 1 { + if retry < 300 { + debugln("retry") + retry++ + leader = 0 + Debug = true + time.Sleep(100 * time.Millisecond) + continue + } + t.Fatalf("wrong leader number %v", leader) + } + if leader == 0 { + if retry < 300 { + retry++ + fmt.Println("retry 0") + leader = 0 + time.Sleep(100 * time.Millisecond) + continue + } + t.Fatalf("wrong leader number %v", leader) + } + if leader == 1 { + Debug = false + break + } } //mutex.Unlock() toStop.SetTransporter(transporter) + toStop_1.SetTransporter(transporter) } } diff --git a/snapshot.go b/snapshot.go index e5e52e74704..d35474f8a43 100644 --- a/snapshot.go +++ b/snapshot.go @@ -27,7 +27,7 @@ type Snapshot struct { } // Save the snapshot to a file -func (ss *Snapshot) Save() error { +func (ss *Snapshot) save() error { // Write machine state to temporary buffer. // open file @@ -59,7 +59,7 @@ func (ss *Snapshot) Save() error { } // remove the file of the snapshot -func (ss *Snapshot) Remove() error { +func (ss *Snapshot) remove() error { err := os.Remove(ss.Path) return err } diff --git a/snapshot_request.go b/snapshot_request.go index 18d39ccc4a2..8ee197d6eeb 100644 --- a/snapshot_request.go +++ b/snapshot_request.go @@ -23,7 +23,7 @@ type SnapshotResponse struct { //------------------------------------------------------------------------------ // Creates a new Snapshot request. -func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { +func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { return &SnapshotRequest{ LeaderName: leaderName, LastIndex: snapshot.LastIndex, @@ -34,7 +34,7 @@ func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest } // Creates a new Snapshot response. -func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse { +func newSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse { return &SnapshotResponse{ Term: term, Success: success, diff --git a/sort.go b/sort.go index 85deb59defc..bf4c303af35 100644 --- a/sort.go +++ b/sort.go @@ -6,7 +6,7 @@ package raft // //------------------------------------------------------------------------------ -type Uint64Slice []uint64 +type uint64Slice []uint64 //------------------------------------------------------------------------------ // @@ -18,6 +18,6 @@ type Uint64Slice []uint64 // uint64 //-------------------------------------- -func (p Uint64Slice) Len() int { return len(p) } -func (p Uint64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p Uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/test.go b/test.go index a466cc18c04..b1b29752e97 100644 --- a/test.go +++ b/test.go @@ -8,14 +8,14 @@ import ( ) const ( - testHeartbeatTimeout = 10 * time.Millisecond - testElectionTimeout = 100 * time.Millisecond + testHeartbeatTimeout = 5 * time.Millisecond + testElectionTimeout = 20 * time.Millisecond ) func init() { RegisterCommand(&joinCommand{}) - RegisterCommand(&TestCommand1{}) - RegisterCommand(&TestCommand2{}) + RegisterCommand(&testCommand1{}) + RegisterCommand(&testCommand2{}) } //------------------------------------------------------------------------------ @@ -44,11 +44,11 @@ func setupLogFile(content string) string { func setupLog(content string) (*Log, string) { path := setupLogFile(content) - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { panic("Unable to open log") } return log, path @@ -82,7 +82,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* lookup[name] = server } for _, server := range servers { - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) for _, peer := range servers { server.AddPeer(peer.Name()) } @@ -147,16 +147,16 @@ func (c *joinCommand) Apply(server *Server) (interface{}, error) { // Command1 //-------------------------------------- -type TestCommand1 struct { +type testCommand1 struct { Val string `json:"val"` I int `json:"i"` } -func (c *TestCommand1) CommandName() string { +func (c *testCommand1) CommandName() string { return "cmd_1" } -func (c *TestCommand1) Apply(server *Server) (interface{}, error) { +func (c *testCommand1) Apply(server *Server) (interface{}, error) { return nil, nil } @@ -164,14 +164,14 @@ func (c *TestCommand1) Apply(server *Server) (interface{}, error) { // Command2 //-------------------------------------- -type TestCommand2 struct { +type testCommand2 struct { X int `json:"x"` } -func (c *TestCommand2) CommandName() string { +func (c *testCommand2) CommandName() string { return "cmd_2" } -func (c *TestCommand2) Apply(server *Server) (interface{}, error) { +func (c *testCommand2) Apply(server *Server) (interface{}, error) { return nil, nil } diff --git a/timer.go b/timer.go index 231e63bbe12..c7eb734118e 100644 --- a/timer.go +++ b/timer.go @@ -12,9 +12,9 @@ import ( // //------------------------------------------------------------------------------ -type Timer struct { - fire chan time.Time - stop chan bool +type timer struct { + fireChan chan time.Time + stopChan chan bool state int rand *rand.Rand @@ -38,7 +38,7 @@ const ( //------------------------------------------------------------------------------ // Creates a new timer. Panics if a non-positive duration is used. -func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { +func newTimer(minDuration time.Duration, maxDuration time.Duration) *timer { if minDuration <= 0 { panic("raft: Non-positive minimum duration not allowed") } else if maxDuration <= 0 { @@ -47,13 +47,13 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { panic("raft: Minimum duration cannot be greater than maximum duration") } - return &Timer{ - rand: rand.New(rand.NewSource(time.Now().UnixNano())), + return &timer{ minDuration: minDuration, maxDuration: maxDuration, state: READY, - stop: make(chan bool, 1), - fire: make(chan time.Time), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + stopChan: make(chan bool, 1), + fireChan: make(chan time.Time), } } @@ -63,28 +63,8 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { // //------------------------------------------------------------------------------ -// Retrieves the minimum duration of the timer. -func (t *Timer) MinDuration() time.Duration { - return t.minDuration -} - -// Sets the minimum duration of the timer. -func (t *Timer) SetMinDuration(duration time.Duration) { - t.minDuration = duration -} - -// Retrieves the maximum duration of the timer. -func (t *Timer) MaxDuration() time.Duration { - return t.maxDuration -} - -// Sets the maximum duration of the timer. -func (t *Timer) SetMaxDuration(duration time.Duration) { - t.maxDuration = duration -} - // Sets the minimum and maximum duration of the timer. -func (t *Timer) SetDuration(duration time.Duration) { +func (t *timer) setDuration(duration time.Duration) { t.minDuration = duration t.maxDuration = duration } @@ -96,12 +76,12 @@ func (t *Timer) SetDuration(duration time.Duration) { //------------------------------------------------------------------------------ // Checks if the timer is currently running. -func (t *Timer) Running() bool { +func (t *timer) running() bool { return t.state == RUNNING } // Stops the timer and closes the channel. -func (t *Timer) Stop() { +func (t *timer) stop() { t.mutex.Lock() defer t.mutex.Unlock() @@ -113,12 +93,12 @@ func (t *Timer) Stop() { t.state = STOPPED // non-blocking buffer - t.stop <- true + t.stopChan <- true } } // Change the state of timer to ready -func (t *Timer) Ready() { +func (t *timer) ready() { t.mutex.Lock() defer t.mutex.Unlock() @@ -126,14 +106,14 @@ func (t *Timer) Ready() { panic("Timer is already running") } t.state = READY - t.stop = make(chan bool, 1) - t.fire = make(chan time.Time) + t.stopChan = make(chan bool, 1) + t.fireChan = make(chan time.Time) } // Fire at the timer -func (t *Timer) Fire() { +func (t *timer) fire() { select { - case t.fire <- time.Now(): + case t.fireChan <- time.Now(): return default: return @@ -146,7 +126,7 @@ func (t *Timer) Fire() { // (3) fired // Return false if stopped. // Make sure the start func will not restart the stopped timer. -func (t *Timer) Start() bool { +func (t *timer) start() bool { t.mutex.Lock() if t.state != READY { @@ -170,8 +150,8 @@ func (t *Timer) Start() bool { stopped := false select { case <-internalTimer.C: - case <-t.fire: - case <-t.stop: + case <-t.fireChan: + case <-t.stopChan: stopped = true } diff --git a/timer_test.go b/timer_test.go index caff0f51d52..60cd746b255 100644 --- a/timer_test.go +++ b/timer_test.go @@ -13,12 +13,12 @@ import ( // Ensure that we can start an election timer and it will go off in the specified duration. func TestTimer(t *testing.T) { - timer := NewTimer(5*time.Millisecond, 10*time.Millisecond) + timer := newTimer(5*time.Millisecond, 10*time.Millisecond) // test timer start for i := 0; i < 10; i++ { start := time.Now() - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 12*time.Millisecond || duration < 5*time.Millisecond { @@ -30,7 +30,7 @@ func TestTimer(t *testing.T) { for i := 0; i < 100; i++ { start := time.Now() go stop(timer) - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 3*time.Millisecond { @@ -38,14 +38,14 @@ func TestTimer(t *testing.T) { } // ready the timer after stop it - timer.Ready() + timer.ready() } // test timer fire for i := 0; i < 100; i++ { start := time.Now() go fire(timer) - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 3*time.Millisecond { @@ -65,22 +65,22 @@ func TestTimer(t *testing.T) { if ret != false { t.Fatal("cannot stop timer!") } - timer.Ready() + timer.ready() } } -func stop(t *Timer) { +func stop(t *timer) { time.Sleep(time.Millisecond) - t.Stop() + t.stop() } -func start(t *Timer, resp chan bool) { +func start(t *timer, resp chan bool) { time.Sleep(time.Millisecond) - resp <- t.Start() + resp <- t.start() } -func fire(t *Timer) { +func fire(t *timer) { time.Sleep(time.Millisecond) - t.Fire() + t.fire() }