Skip to content

Commit

Permalink
Merge pull request #88 from coreos/master
Browse files Browse the repository at this point in the history
encode entry should reuse memory rather than alloc each time
  • Loading branch information
benbjohnson committed Aug 2, 2013
2 parents 59333b0 + f5d89d0 commit 0be5a48
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 29 deletions.
2 changes: 1 addition & 1 deletion debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var logLevel int = 0
var logger *log.Logger

func init() {
logger = log.New(os.Stdout, "raft", log.Lmicroseconds)
logger = log.New(os.Stdout, "[raft]", log.Lmicroseconds)
}

//------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions http_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type HTTPMuxer interface {
// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
return &HTTPTransporter{
DisableKeepAlives: false,
prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
Expand Down
64 changes: 64 additions & 0 deletions http_transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,67 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
// Wait until everything is done.
wg.Wait()
}

func BenchmarkSpeed(b *testing.B) {

transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true

servers := []*Server{}

for i:= 0; i < 3; i++ {
port := 9000 + i

// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.SetElectionTimeout(testElectionTimeout)
server.Start()

defer server.Stop()
servers = append(servers, server)

// Create listener for HTTP server and start it.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
defer listener.Close()

// Create wrapping HTTP server.
mux := http.NewServeMux()
transporter.Install(server, mux)
httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}

go func() { httpServer.Serve(listener) }()
}

// Setup configuration.
for _, server := range servers {
(servers)[0].Do(&DefaultJoinCommand{Name: server.Name()})
}

c := make(chan bool)

// Wait for configuration to propagate.
time.Sleep(2 * time.Second)

b.ResetTimer()
for n := 0; n < b.N; n++ {
for i := 0; i < 1000; i++ {
go send(c, servers[0])
}

for i := 0; i < 1000; i++ {
<-c
}
}
}

func send(c chan bool, s *Server) {
for i := 0; i < 20; i++ {
s.Do(&NOPCommand{})
}
c <- true
}

9 changes: 8 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package raft

import (
"bufio"
"code.google.com/p/goprotobuf/proto"
"errors"
"fmt"
"github.com/benbjohnson/go-raft/protobuf"
"io"
"os"
"sync"
Expand All @@ -26,6 +28,8 @@ type Log struct {
mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64
pBuffer *proto.Buffer
pLogEntry *protobuf.ProtoLogEntry
}

// The results of the applying a log entry.
Expand All @@ -43,7 +47,9 @@ type logResult struct {
// Creates a new log.
func newLog() *Log {
return &Log{
entries: make([]*LogEntry, 0),
entries: make([]*LogEntry, 0),
pBuffer: proto.NewBuffer(nil),
pLogEntry: &protobuf.ProtoLogEntry{},
}
}

Expand Down Expand Up @@ -470,6 +476,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
var err error
// Append each entry but exit if we hit an error.
for _, entry := range entries {
entry.log = l
if size, err = l.writeEntry(entry, w); err != nil {
return err
}
Expand Down
19 changes: 8 additions & 11 deletions log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,26 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
// Encodes the log entry to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (e *LogEntry) encode(w io.Writer) (int, error) {
defer e.log.pBuffer.Reset()

p := proto.NewBuffer(nil)
e.log.pLogEntry.Index = proto.Uint64(e.Index)
e.log.pLogEntry.Term = proto.Uint64(e.Term)
e.log.pLogEntry.CommandName = proto.String(e.CommandName)
e.log.pLogEntry.Command = e.Command

pb := &protobuf.ProtoLogEntry{
Index: proto.Uint64(e.Index),
Term: proto.Uint64(e.Term),
CommandName: proto.String(e.CommandName),
Command: e.Command,
}

err := p.Marshal(pb)
err := e.log.pBuffer.Marshal(e.log.pLogEntry)

if err != nil {
return -1, err
}

_, err = fmt.Fprintf(w, "%8x\n", len(p.Bytes()))
_, err = fmt.Fprintf(w, "%8x\n", len(e.log.pBuffer.Bytes()))

if err != nil {
return -1, err
}

return w.Write(p.Bytes())
return w.Write(e.log.pBuffer.Bytes())
}

// Decodes the log entry from a buffer. Returns the number of bytes read and
Expand Down
19 changes: 11 additions & 8 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func TestLogNewLog(t *testing.T) {

// Ensure that we can decode and encode to an existing log.
func TestLogExistingLog(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
Expand All @@ -86,9 +87,10 @@ func TestLogExistingLog(t *testing.T) {

// Ensure that we can check the contents of the log by index/term.
func TestLogContainsEntries(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
Expand All @@ -112,8 +114,9 @@ func TestLogContainsEntries(t *testing.T) {

// Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-")

e0.encode(f)
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
)

const (
MaxLogEntriesPerRequest = 200
MaxLogEntriesPerRequest = 2000
NumberOfLogEntriesAfterSnapshot = 200
)

Expand Down
9 changes: 5 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {

// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})

// start as a follower with term 2 and index 3
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {

// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})

// start as a follower
Expand Down
6 changes: 3 additions & 3 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

const (
testHeartbeatTimeout = 5 * time.Millisecond
testElectionTimeout = 20 * time.Millisecond
testHeartbeatTimeout = 50 * time.Millisecond
testElectionTimeout = 200 * time.Millisecond
)

func init() {
Expand Down Expand Up @@ -85,7 +85,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn

func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
servers := []*Server{}
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})

for _, name := range names {
if lookup[name] != nil {
Expand Down

0 comments on commit 0be5a48

Please sign in to comment.