Skip to content

Commit

Permalink
Merge pull request #48 from benbjohnson/47-external-interface-cleanup
Browse files Browse the repository at this point in the history
[Fix #47] Clean up external interface.
  • Loading branch information
benbjohnson committed Jul 6, 2013
2 parents 72d2d09 + 44f3ef6 commit 0dd663e
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 300 deletions.
4 changes: 2 additions & 2 deletions append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AppendEntriesResponse struct {
//------------------------------------------------------------------------------

// Creates a new AppendEntries request.
func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest {
func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest {
return &AppendEntriesRequest{
Term: term,
LeaderName: leaderName,
Expand All @@ -42,7 +42,7 @@ func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64
}

// Creates a new AppendEntries response.
func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
return &AppendEntriesResponse{
Term: term,
Success: success,
Expand Down
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Command interface {
//--------------------------------------

// Creates a new instance of a command by name.
func NewCommand(name string) (Command, error) {
func newCommand(name string) (Command, error) {
// Find the registered command.
command := commandTypes[name]
if command == nil {
Expand Down
80 changes: 28 additions & 52 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Log struct {
//------------------------------------------------------------------------------

// Creates a new log.
func NewLog() *Log {
func newLog() *Log {
return &Log{
entries: make([]*LogEntry, 0),
}
Expand All @@ -47,24 +47,12 @@ func NewLog() *Log {
//
//------------------------------------------------------------------------------

func (l *Log) SetStartIndex(i uint64) {
l.startIndex = i
}

func (l *Log) StartIndex() uint64 {
return l.startIndex
}

func (l *Log) SetStartTerm(t uint64) {
l.startTerm = t
}

//--------------------------------------
// Log Indices
//--------------------------------------

// The current index in the log.
func (l *Log) CurrentIndex() uint64 {
func (l *Log) currentIndex() uint64 {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -83,24 +71,19 @@ func (l *Log) internalCurrentIndex() uint64 {
}

// The next index in the log.
func (l *Log) NextIndex() uint64 {
return l.CurrentIndex() + 1
}

// The last committed index in the log.
func (l *Log) CommitIndex() uint64 {
return l.commitIndex
func (l *Log) nextIndex() uint64 {
return l.currentIndex() + 1
}

// Determines if the log contains zero entries.
func (l *Log) IsEmpty() bool {
func (l *Log) isEmpty() bool {
l.mutex.Lock()
defer l.mutex.Unlock()
return (len(l.entries) == 0) && (l.startIndex == 0)
}

// The name of the last command in the log.
func (l *Log) LastCommandName() string {
func (l *Log) lastCommandName() string {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(l.entries) > 0 {
Expand All @@ -116,7 +99,7 @@ func (l *Log) LastCommandName() string {
//--------------------------------------

// The current term in the log.
func (l *Log) CurrentTerm() uint64 {
func (l *Log) currentTerm() uint64 {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -138,7 +121,7 @@ func (l *Log) CurrentTerm() uint64 {

// Opens the log file and reads existing entries. The log can remain open and
// continue to append entries to the end of the log.
func (l *Log) Open(path string) error {
func (l *Log) open(path string) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -160,8 +143,8 @@ func (l *Log) Open(path string) error {
}

// Instantiate log entry and decode into it.
entry := NewLogEntry(l, 0, 0, nil)
n, err := entry.Decode(reader)
entry := newLogEntry(l, 0, 0, nil)
n, err := entry.decode(reader)
if err != nil {
file.Close()
if err = os.Truncate(path, int64(lastIndex)); err != nil {
Expand Down Expand Up @@ -196,7 +179,7 @@ func (l *Log) Open(path string) error {
}

// Closes the log file.
func (l *Log) Close() {
func (l *Log) close() {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -213,12 +196,12 @@ func (l *Log) Close() {
//--------------------------------------

// Creates a log entry associated with this log.
func (l *Log) CreateEntry(term uint64, command Command) *LogEntry {
return NewLogEntry(l, l.NextIndex(), term, command)
func (l *Log) createEntry(term uint64, command Command) *LogEntry {
return newLogEntry(l, l.nextIndex(), term, command)
}

// Checks if the log contains a given index/term combination.
func (l *Log) ContainsEntry(index uint64, term uint64) bool {
func (l *Log) containsEntry(index uint64, term uint64) bool {
if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) {
return false
}
Expand All @@ -228,7 +211,7 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool {
// Retrieves a list of entries after a given index as well as the term of the
// index provided. A nil list of entries is returned if the index no longer
// exists because a snapshot was made.
func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {
func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand Down Expand Up @@ -259,7 +242,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) {

// Retrieves the error returned from an entry. The error can only exist after
// the entry has been committed.
func (l *Log) GetEntryError(entry *LogEntry) error {
func (l *Log) getEntryError(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -278,7 +261,7 @@ func (l *Log) GetEntryError(entry *LogEntry) error {
//--------------------------------------

// Retrieves the last index and term that has been committed to the log.
func (l *Log) CommitInfo() (index uint64, term uint64) {
func (l *Log) commitInfo() (index uint64, term uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -298,7 +281,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) {
}

// Retrieves the last index and term that has been committed to the log.
func (l *Log) LastInfo() (index uint64, term uint64) {
func (l *Log) lastInfo() (index uint64, term uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -313,14 +296,14 @@ func (l *Log) LastInfo() (index uint64, term uint64) {
}

// Updates the commit index
func (l *Log) UpdateCommitIndex(index uint64) {
func (l *Log) updateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.commitIndex = index
}

// Updates the commit index and writes entries after that index to the stable storage.
func (l *Log) SetCommitIndex(index uint64) error {
func (l *Log) setCommitIndex(index uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand Down Expand Up @@ -353,7 +336,7 @@ func (l *Log) SetCommitIndex(index uint64) error {
entry := l.entries[entryIndex]

// Write to storage.
if err := entry.Encode(l.file); err != nil {
if err := entry.encode(l.file); err != nil {
return err
}

Expand All @@ -373,14 +356,14 @@ func (l *Log) SetCommitIndex(index uint64) error {

// Truncates the log to the given index and term. This only works if the log
// at the index has not been committed.
func (l *Log) Truncate(index uint64, term uint64) error {
func (l *Log) truncate(index uint64, term uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()
debugln("[Truncate] truncate to ", index)
// Do not allow committed entries to be truncated.
if index < l.CommitIndex() {
if index < l.commitIndex {
debugln("[Truncate] error 1")
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term)
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term)
}

// Do not truncate past end of entries.
Expand Down Expand Up @@ -415,8 +398,8 @@ func (l *Log) Truncate(index uint64, term uint64) error {
//--------------------------------------

// Appends a series of entries to the log. These entries are not written to
// disk until SetCommitIndex() is called.
func (l *Log) AppendEntries(entries []*LogEntry) error {
// disk until setCommitIndex() is called.
func (l *Log) appendEntries(entries []*LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()

Expand All @@ -430,13 +413,6 @@ func (l *Log) AppendEntries(entries []*LogEntry) error {
return nil
}

// Appends a single entry to the log.
func (l *Log) AppendEntry(entry *LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.appendEntry(entry)
}

// Writes a single log entry to the end of the log. This function does not
// obtain a lock and should only be used internally. Use AppendEntries() and
// AppendEntry() to use it externally.
Expand Down Expand Up @@ -467,7 +443,7 @@ func (l *Log) appendEntry(entry *LogEntry) error {
//--------------------------------------

// compaction the log before index
func (l *Log) Compact(index uint64, term uint64) error {
func (l *Log) compact(index uint64, term uint64) error {
var entries []*LogEntry

l.mutex.Lock()
Expand All @@ -490,7 +466,7 @@ func (l *Log) Compact(index uint64, term uint64) error {
return err
}
for _, entry := range entries {
err = entry.Encode(file)
err = entry.encode(file)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type logEntryRawMessage struct {
//------------------------------------------------------------------------------

// Creates a new log entry associated with a log.
func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry {
func newLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry {
return &LogEntry{
log: log,
Index: index,
Expand All @@ -62,7 +62,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry
//--------------------------------------

// Encodes the log entry to a buffer.
func (e *LogEntry) Encode(w io.Writer) error {
func (e *LogEntry) encode(w io.Writer) error {
if w == nil {
return errors.New("raft.LogEntry: Writer required to encode")
}
Expand All @@ -87,7 +87,7 @@ func (e *LogEntry) Encode(w io.Writer) error {
}

// Decodes the log entry from a buffer. Returns the number of bytes read.
func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
func (e *LogEntry) decode(r io.Reader) (pos int, err error) {
pos = 0

if r == nil {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
}

// Instantiate command by name.
command, err := NewCommand(commandName)
command, err := newCommand(commandName)
if err != nil {
err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
return
Expand Down Expand Up @@ -187,7 +187,7 @@ func (e *LogEntry) UnmarshalJSON(data []byte) error {

// Create a command based on the name.
var err error
if e.Command, err = NewCommand(obj.Name); err != nil {
if e.Command, err = newCommand(obj.Name); err != nil {
return err
}
json.Unmarshal(obj.Command, e.Command)
Expand Down
4 changes: 2 additions & 2 deletions log_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Ensure that we can encode a log entry to JSON.
func TestLogEntryMarshal(t *testing.T) {
e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
e := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) {
t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err)
}
Expand All @@ -32,6 +32,6 @@ func TestLogEntryUnmarshal(t *testing.T) {
t.Fatalf("Log entry unmarshalling error: %v", err)
}
if !(e.Index == 1 && e.Term == 2 && reflect.DeepEqual(e.Command, &joinCommand{Name: "localhost:1000"})) {
t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}))
t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}))
}
}
Loading

0 comments on commit 0dd663e

Please sign in to comment.