From 1352897623447e58da386fa8f1fb986c807a428e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 28 Jul 2013 20:19:02 -0700 Subject: [PATCH 1/5] change debug prefix from raft to [raft] --- debug.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debug.go b/debug.go index 865617e62c9..97e2bc7728f 100644 --- a/debug.go +++ b/debug.go @@ -20,7 +20,7 @@ var logLevel int = 0 var logger *log.Logger func init() { - logger = log.New(os.Stdout, "raft", log.Lmicroseconds) + logger = log.New(os.Stdout, "[raft]", log.Lmicroseconds) } //------------------------------------------------------------------------------ From 1ff290c4a6f5fc03cbe5d596ef37dc1589cf4de9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 1 Aug 2013 13:21:05 -0700 Subject: [PATCH 2/5] encode entry should reuse memory rather than alloc each time --- log_entry.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/log_entry.go b/log_entry.go index cefdc3ece99..2ed732a0b64 100644 --- a/log_entry.go +++ b/log_entry.go @@ -9,6 +9,9 @@ import ( "io" ) +var p = proto.NewBuffer(nil) +var pb = &protobuf.ProtoLogEntry{} + // A log entry stores a single item in the log. type LogEntry struct { log *Log @@ -50,15 +53,12 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr // Encodes the log entry to a buffer. Returns the number of bytes // written and any error that may have occurred. func (e *LogEntry) encode(w io.Writer) (int, error) { + defer p.Reset() - p := proto.NewBuffer(nil) - - pb := &protobuf.ProtoLogEntry{ - Index: proto.Uint64(e.Index), - Term: proto.Uint64(e.Term), - CommandName: proto.String(e.CommandName), - Command: e.Command, - } + pb.Index = proto.Uint64(e.Index) + pb.Term = proto.Uint64(e.Term) + pb.CommandName = proto.String(e.CommandName) + pb.Command = e.Command err := p.Marshal(pb) From 78cb651d93fd5c9f5406db13b6b6e5ee0c25db2a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 1 Aug 2013 17:58:03 -0700 Subject: [PATCH 3/5] move buf to log struct --- http_transporter.go | 3 +++ http_transporter_test.go | 30 ++++++++++++++++++++++++++++++ log.go | 9 ++++++++- log_entry.go | 19 ++++++++----------- log_test.go | 19 +++++++++++-------- server.go | 2 +- server_test.go | 9 +++++---- test.go | 6 +++--- 8 files changed, 69 insertions(+), 28 deletions(-) diff --git a/http_transporter.go b/http_transporter.go index ca0e2ec92c4..539fb458658 100644 --- a/http_transporter.go +++ b/http_transporter.go @@ -7,6 +7,8 @@ import ( "net/http" ) +import _ "net/http/pprof" + // Parts from this transporter were heavily influenced by Peter Bougon's // raft implementation: https://github.com/peterbourgon/raft @@ -38,6 +40,7 @@ type HTTPMuxer interface { // Creates a new HTTP transporter with the given path prefix. func NewHTTPTransporter(prefix string) *HTTPTransporter { return &HTTPTransporter{ + DisableKeepAlives: false, prefix: prefix, appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), diff --git a/http_transporter_test.go b/http_transporter_test.go index 1334a910609..aaf4796a1e8 100644 --- a/http_transporter_test.go +++ b/http_transporter_test.go @@ -4,6 +4,8 @@ import ( "fmt" "net" "net/http" + "os" + "runtime/pprof" "sync" "testing" "time" @@ -76,6 +78,27 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Wait for configuration to propagate. time.Sleep(testHeartbeatTimeout * 2) + f, _ := os.Create("raftprof") + + pprof.StartCPUProfile(f) + + c := make(chan bool) + start := time.Now() + + for i := 0; i < 1000; i++ { + go send(c, (*servers)[0]) + } + + for i := 0; i < 1000; i++ { + <-c + } + end := time.Now() + fmt.Println(end.Sub(start), "commands ", 1000*20) + pprof.StopCPUProfile() + + // Wait for configuration to propagate. + time.Sleep(testHeartbeatTimeout * 2) + // Execute all the callbacks at the same time. for _i, _f := range callbacks { i, f := _i, _f @@ -88,3 +111,10 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Wait until everything is done. wg.Wait() } + +func send(c chan bool, s *Server) { + for i := 0; i < 20; i++ { + s.Do(&NOPCommand{}) + } + c <- true +} diff --git a/log.go b/log.go index 472aa59604c..4033e92f929 100644 --- a/log.go +++ b/log.go @@ -2,8 +2,10 @@ package raft import ( "bufio" + "code.google.com/p/goprotobuf/proto" "errors" "fmt" + "github.com/benbjohnson/go-raft/protobuf" "io" "os" "sync" @@ -26,6 +28,8 @@ type Log struct { mutex sync.RWMutex startIndex uint64 // the index before the first entry in the Log entries startTerm uint64 + pBuffer *proto.Buffer + pLogEntry *protobuf.ProtoLogEntry } // The results of the applying a log entry. @@ -43,7 +47,9 @@ type logResult struct { // Creates a new log. func newLog() *Log { return &Log{ - entries: make([]*LogEntry, 0), + entries: make([]*LogEntry, 0), + pBuffer: proto.NewBuffer(nil), + pLogEntry: &protobuf.ProtoLogEntry{}, } } @@ -470,6 +476,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error { var err error // Append each entry but exit if we hit an error. for _, entry := range entries { + entry.log = l if size, err = l.writeEntry(entry, w); err != nil { return err } diff --git a/log_entry.go b/log_entry.go index 2ed732a0b64..45f34f700cf 100644 --- a/log_entry.go +++ b/log_entry.go @@ -9,9 +9,6 @@ import ( "io" ) -var p = proto.NewBuffer(nil) -var pb = &protobuf.ProtoLogEntry{} - // A log entry stores a single item in the log. type LogEntry struct { log *Log @@ -53,26 +50,26 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr // Encodes the log entry to a buffer. Returns the number of bytes // written and any error that may have occurred. func (e *LogEntry) encode(w io.Writer) (int, error) { - defer p.Reset() + defer e.log.pBuffer.Reset() - pb.Index = proto.Uint64(e.Index) - pb.Term = proto.Uint64(e.Term) - pb.CommandName = proto.String(e.CommandName) - pb.Command = e.Command + e.log.pLogEntry.Index = proto.Uint64(e.Index) + e.log.pLogEntry.Term = proto.Uint64(e.Term) + e.log.pLogEntry.CommandName = proto.String(e.CommandName) + e.log.pLogEntry.Command = e.Command - err := p.Marshal(pb) + err := e.log.pBuffer.Marshal(e.log.pLogEntry) if err != nil { return -1, err } - _, err = fmt.Fprintf(w, "%8x\n", len(p.Bytes())) + _, err = fmt.Fprintf(w, "%8x\n", len(e.log.pBuffer.Bytes())) if err != nil { return -1, err } - return w.Write(p.Bytes()) + return w.Write(e.log.pBuffer.Bytes()) } // Decodes the log entry from a buffer. Returns the number of bytes read and diff --git a/log_test.go b/log_test.go index a8723bb958e..e890090c356 100644 --- a/log_test.go +++ b/log_test.go @@ -62,9 +62,10 @@ func TestLogNewLog(t *testing.T) { // Ensure that we can decode and encode to an existing log. func TestLogExistingLog(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) + tmpLog := newLog() + e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -86,9 +87,10 @@ func TestLogExistingLog(t *testing.T) { // Ensure that we can check the contents of the log by index/term. func TestLogContainsEntries(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) + tmpLog := newLog() + e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -112,8 +114,9 @@ func TestLogContainsEntries(t *testing.T) { // Ensure that we can recover from an incomplete/corrupt log and continue logging. func TestLogRecovery(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) + tmpLog := newLog() + e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) f, _ := ioutil.TempFile("", "raft-log-") e0.encode(f) diff --git a/server.go b/server.go index 4f33ee7c7d9..d1b6baa4b7e 100644 --- a/server.go +++ b/server.go @@ -29,7 +29,7 @@ const ( ) const ( - MaxLogEntriesPerRequest = 200 + MaxLogEntriesPerRequest = 2000 NumberOfLogEntriesAfterSnapshot = 200 ) diff --git a/server_test.go b/server_test.go index 829b5a30306..96e2632048f 100644 --- a/server_test.go +++ b/server_test.go @@ -102,9 +102,10 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { // Ensure that a vote request is denied if the log is out of date. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) + tmpLog := newLog() + e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) // start as a follower with term 2 and index 3 @@ -143,7 +144,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader. func TestServerPromoteSelf(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) // start as a follower diff --git a/test.go b/test.go index fb98e89bc2b..606594bf7a4 100644 --- a/test.go +++ b/test.go @@ -8,8 +8,8 @@ import ( ) const ( - testHeartbeatTimeout = 5 * time.Millisecond - testElectionTimeout = 20 * time.Millisecond + testHeartbeatTimeout = 50 * time.Millisecond + testElectionTimeout = 200 * time.Millisecond ) func init() { @@ -85,7 +85,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server { servers := []*Server{} - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) for _, name := range names { if lookup[name] != nil { From ce95c8ba67d8241a506f395c681cb15e182671f5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 1 Aug 2013 18:00:10 -0700 Subject: [PATCH 4/5] remove unwanted codes --- http_transporter.go | 2 -- http_transporter_test.go | 7 ------- 2 files changed, 9 deletions(-) diff --git a/http_transporter.go b/http_transporter.go index 539fb458658..f156815de61 100644 --- a/http_transporter.go +++ b/http_transporter.go @@ -7,8 +7,6 @@ import ( "net/http" ) -import _ "net/http/pprof" - // Parts from this transporter were heavily influenced by Peter Bougon's // raft implementation: https://github.com/peterbourgon/raft diff --git a/http_transporter_test.go b/http_transporter_test.go index aaf4796a1e8..361d6ce78d9 100644 --- a/http_transporter_test.go +++ b/http_transporter_test.go @@ -4,8 +4,6 @@ import ( "fmt" "net" "net/http" - "os" - "runtime/pprof" "sync" "testing" "time" @@ -78,10 +76,6 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Wait for configuration to propagate. time.Sleep(testHeartbeatTimeout * 2) - f, _ := os.Create("raftprof") - - pprof.StartCPUProfile(f) - c := make(chan bool) start := time.Now() @@ -94,7 +88,6 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans } end := time.Now() fmt.Println(end.Sub(start), "commands ", 1000*20) - pprof.StopCPUProfile() // Wait for configuration to propagate. time.Sleep(testHeartbeatTimeout * 2) From f5d89d0b7630f525daa06d9c0539cf3f7650188b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 1 Aug 2013 18:57:08 -0700 Subject: [PATCH 5/5] bench --- http_transporter_test.go | 81 ++++++++++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/http_transporter_test.go b/http_transporter_test.go index 361d6ce78d9..f55e353b50f 100644 --- a/http_transporter_test.go +++ b/http_transporter_test.go @@ -76,22 +76,6 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Wait for configuration to propagate. time.Sleep(testHeartbeatTimeout * 2) - c := make(chan bool) - start := time.Now() - - for i := 0; i < 1000; i++ { - go send(c, (*servers)[0]) - } - - for i := 0; i < 1000; i++ { - <-c - } - end := time.Now() - fmt.Println(end.Sub(start), "commands ", 1000*20) - - // Wait for configuration to propagate. - time.Sleep(testHeartbeatTimeout * 2) - // Execute all the callbacks at the same time. for _i, _f := range callbacks { i, f := _i, _f @@ -105,9 +89,66 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans wg.Wait() } -func send(c chan bool, s *Server) { - for i := 0; i < 20; i++ { - s.Do(&NOPCommand{}) +func BenchmarkSpeed(b *testing.B) { + + transporter := NewHTTPTransporter("/raft") + transporter.DisableKeepAlives = true + + servers := []*Server{} + + for i:= 0; i < 3; i++ { + port := 9000 + i + + // Create raft server. + server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter) + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.SetElectionTimeout(testElectionTimeout) + server.Start() + + defer server.Stop() + servers = append(servers, server) + + // Create listener for HTTP server and start it. + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + panic(err) + } + defer listener.Close() + + // Create wrapping HTTP server. + mux := http.NewServeMux() + transporter.Install(server, mux) + httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux} + + go func() { httpServer.Serve(listener) }() } - c <- true + + // Setup configuration. + for _, server := range servers { + (servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}) + } + + c := make(chan bool) + + // Wait for configuration to propagate. + time.Sleep(2 * time.Second) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1000; i++ { + go send(c, servers[0]) + } + + for i := 0; i < 1000; i++ { + <-c + } + } +} + +func send(c chan bool, s *Server) { + for i := 0; i < 20; i++ { + s.Do(&NOPCommand{}) + } + c <- true } +