Skip to content

Use the Connection Buffer for Writing #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Changes:
- Made closing rows and connections error tolerant. This allows for example deferring rows.Close() without checking for errors
- New Logo
- Changed the copyright header to include all contributors
- Optimized the read buffer
- Optimized the buffer for reading
- Use the buffer also for writing. This results in zero allocations (by the driver) for most queries
- Improved the LOAD INFILE documentation
- The driver struct is now exported to make the driver directly accessible
- Refactored the driver tests
Expand Down
7 changes: 5 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,26 @@ func BenchmarkQuery(b *testing.B) {

stmt := tb.checkStmt(db.Prepare("SELECT val FROM foo WHERE id=?"))
defer stmt.Close()
b.StartTimer()

remain := int64(b.N)
var wg sync.WaitGroup
wg.Add(concurrencyLevel)
defer wg.Wait()
b.StartTimer()

for i := 0; i < concurrencyLevel; i++ {
go func() {
defer wg.Done()
for {
if atomic.AddInt64(&remain, -1) < 0 {
wg.Done()
return
}

var got string
tb.check(stmt.QueryRow(1).Scan(&got))
if got != "one" {
b.Errorf("query = %q; want one", got)
wg.Done()
return
}
}
Expand Down
51 changes: 49 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import "io"

const defaultBufSize = 4096

// A read buffer similar to bufio.Reader but zero-copy-ish
// A buffer which is used for both reading and writing.
// This is possible since communication on each connection is synchronous.
// In other words, we can't write and read simultaneously on the same connection.
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
// Also highly optimized for this particular use case.
type buffer struct {
buf []byte
Expand All @@ -37,8 +40,11 @@ func (b *buffer) fill(need int) (err error) {
}

// grow buffer if necessary
// TODO: let the buffer shrink again at some point
// Maybe keep the org buf slice and swap back?
if need > len(b.buf) {
newBuf := make([]byte, need)
// Round up to the next multiple of the default size
newBuf := make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
copy(newBuf, b.buf)
b.buf = newBuf
}
Expand Down Expand Up @@ -74,3 +80,44 @@ func (b *buffer) readNext(need int) (p []byte, err error) {
b.length -= need
return
}

// returns a buffer with the requested size.
// If possible, a slice from the existing buffer is returned.
// Otherwise a bigger buffer is made.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeBuffer(length int) []byte {
if b.length > 0 {
return nil
}

// test (cheap) general case first
if length <= defaultBufSize || length <= cap(b.buf) {
return b.buf[:length]
}

if length < maxPacketSize {
b.buf = make([]byte, length)
return b.buf
}
return make([]byte, length)
}

// shortcut which can be used if the requested buffer is guaranteed to be
// smaller than defaultBufSize
// Only one buffer (total) can be used at a time.
func (b *buffer) takeSmallBuffer(length int) []byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere this is used in packets.go, it's immediately followed by if data == nil based error handling, so even if it's inlined by the compiler, it always branches twice. b.length != 0 is the only way this can return nil, so I say manually inline all occurrences (and add a comment), use the buffer directly and drop the superflouus if for all 4 occurences.
That's bad from a code style perspective, but the comment should suffice. It will probably be a little faster and not more complex than it is now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Branches are not generally evil. In this case 1 branch miss is to be expected. I think keeping the buffer logic separate is worth it.

if b.length == 0 {
return b.buf[:length]
}
return nil
}

// takeCompleteBuffer returns the complete existing buffer.
// This can be used if the necessary buffer size is unknown.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeCompleteBuffer() []byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for takeSmallBuffer, but this one only has one caller (in writeExecutePacket).

if b.length == 0 {
return b.buf
}
return nil
}
57 changes: 27 additions & 30 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) {
columnCount, err := stmt.readPrepareResultPacket()
if err == nil {
if stmt.paramCount > 0 {
stmt.params, err = stmt.mc.readColumns(stmt.paramCount)
stmt.params, err = mc.readColumns(stmt.paramCount)
if err != nil {
return nil, err
}
}

if columnCount > 0 {
err = stmt.mc.readUntilEOF()
err = mc.readUntilEOF()
}
}

Expand Down Expand Up @@ -171,26 +171,24 @@ func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, err
}

// Internal function to execute commands
func (mc *mysqlConn) exec(query string) (err error) {
func (mc *mysqlConn) exec(query string) error {
// Send command
err = mc.writeCommandPacketStr(comQuery, query)
err := mc.writeCommandPacketStr(comQuery, query)
if err != nil {
return
return err
}

// Read Result
var resLen int
resLen, err = mc.readResultSetHeaderPacket()
resLen, err := mc.readResultSetHeaderPacket()
if err == nil && resLen > 0 {
err = mc.readUntilEOF()
if err != nil {
return
if err = mc.readUntilEOF(); err != nil {
return err
}

err = mc.readUntilEOF()
}

return
return err
}

func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, error) {
Expand All @@ -211,7 +209,6 @@ func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, erro
return rows, err
}
}

return nil, err
}

Expand All @@ -221,29 +218,29 @@ func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, erro

// Gets the value of the given MySQL System Variable
// The returned byte slice is only valid until the next read
func (mc *mysqlConn) getSystemVar(name string) (val []byte, err error) {
func (mc *mysqlConn) getSystemVar(name string) ([]byte, error) {
// Send command
err = mc.writeCommandPacketStr(comQuery, "SELECT @@"+name)
if err := mc.writeCommandPacketStr(comQuery, "SELECT @@"+name); err != nil {
return nil, err
}

// Read Result
resLen, err := mc.readResultSetHeaderPacket()
if err == nil {
// Read Result
var resLen int
resLen, err = mc.readResultSetHeaderPacket()
if err == nil {
rows := &mysqlRows{mc, false, nil, false}
rows := &mysqlRows{mc, false, nil, false}

if resLen > 0 {
// Columns
rows.columns, err = mc.readColumns(resLen)
if resLen > 0 {
// Columns
rows.columns, err = mc.readColumns(resLen)
if err != nil {
return nil, err
}
}

dest := make([]driver.Value, resLen)
err = rows.readRow(dest)
if err == nil {
val = dest[0].([]byte)
err = mc.readUntilEOF()
}
dest := make([]driver.Value, resLen)
if err = rows.readRow(dest); err == nil {
return dest[0].([]byte), mc.readUntilEOF()
}
}

return
return nil, err
}
Loading