Skip to content

Commit

Permalink
storage/engine: batch concurrent commits in Go
Browse files Browse the repository at this point in the history
Batch concurrent commits of write-only batches (i.e. most batches) in
Go. This gives a 10% performance boost on a write-only workload on my
laptop and a 50% performance boost on a write-only workload on a
single-node cluster running on Azure.

See #13974
  • Loading branch information
petermattis committed Mar 16, 2017
1 parent 7448677 commit f99285a
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 2 deletions.
55 changes: 55 additions & 0 deletions pkg/storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"encoding/binary"
"fmt"
"reflect"
"sync/atomic"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -1004,3 +1006,56 @@ func TestBatchIteration(t *testing.T) {
t.Fatalf("expected 'Prev() not supported', got %s", iter.Error())
}
}

// Test combining of concurrent commits of write-only batches, verifying that
// all of the keys written by the individual batches are subsequently readable.
func TestBatchCombine(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop()
e := NewInMem(roachpb.Attributes{}, 1<<20)
stopper.AddCloser(e)

var n uint32
const count = 10000

errs := make(chan error, 10)
for i := 0; i < cap(errs); i++ {
go func() {
for {
v := atomic.AddUint32(&n, 1) - 1
if v >= count {
break
}
k := fmt.Sprint(v)

b := e.NewWriteOnlyBatch()
if err := b.Put(mvccKey(k), []byte(k)); err != nil {
errs <- errors.Wrap(err, "put failed")
return
}
if err := b.Commit(false); err != nil {
errs <- errors.Wrap(err, "commit failed")
return
}

// Verify we can read the key we just wrote immediately.
if v, err := e.Get(mvccKey(k)); err != nil {
errs <- errors.Wrap(err, "get failed")
return
} else if string(v) != k {
errs <- errors.Errorf("read %q from engine, expected %q", v, k)
return
}
}
errs <- nil
}()
}

for i := 0; i < cap(errs); i++ {
if err := <-errs; err != nil {
t.Error(err)
}
}
}
87 changes: 85 additions & 2 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -290,6 +291,16 @@ type RocksDB struct {
maxSize int64 // Used for calculating rebalancing and free space.
maxOpenFiles int // The maximum number of open files this instance will use.
deallocated chan struct{} // Closed when the underlying handle is deallocated.

commit struct {
syncutil.Mutex
cond *sync.Cond
committing bool
commitSeq uint64
pendingSeq uint64
pendingSync bool
pending []*rocksDBBatch
}
}

var _ Engine = &RocksDB{}
Expand Down Expand Up @@ -388,6 +399,8 @@ func (r *RocksDB) open() error {
}
}

r.commit.cond = sync.NewCond(&r.commit.Mutex)

// Start a goroutine that will finish when the underlying handle
// is deallocated. This is used to check a leak in tests.
go func() {
Expand Down Expand Up @@ -930,6 +943,7 @@ type rocksDBBatch struct {
distinctOpen bool
distinctNeedsFlush bool
writeOnly bool
commitErr error
}

func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch {
Expand Down Expand Up @@ -1081,15 +1095,84 @@ func (r *rocksDBBatch) NewIterator(prefix bool) Iterator {
return iter
}

func (r *rocksDBBatch) Commit(sync bool) error {
func (r *rocksDBBatch) Commit(syncCommit bool) error {
if r.closed() {
panic("this batch was already committed")
}
r.distinctOpen = false

// Combine multiple write-only batch commits into a single call to
// RocksDB. RocksDB is supposed to be performing such batching internally,
// but whether Cgo or something else, it isn't achieving the same degree of
// batching. Instrumentation shows that internally RocksDB almost never
// batches commits together. While the batching below often can batch 20 or
// 30 concurrent commits.
if r.writeOnly {
// The leader for the commit is the first batch to be added to the pending
// slice. Each commit has an associated sequence number. For a given
// sequence number, there can be only a single leader.
c := &r.parent.commit
c.Lock()
leader := len(c.pending) == 0
// Perform a sync if any of the commits require a sync.
c.pendingSync = c.pendingSync || syncCommit
c.pending = append(c.pending, r)
seq := c.pendingSeq

if leader {
// We're the leader. Wait for any running commit to finish.
for c.committing {
c.cond.Wait()
}
if seq != c.pendingSeq {
log.Fatalf(context.TODO(), "expected commit sequence %d, but found %d", seq, c.pendingSeq)
}
pending := c.pending
syncCommit = c.pendingSync
c.pending = nil
c.pendingSeq++
c.pendingSync = false
c.committing = true
c.Unlock()

// Bundle all of the batches together.
var err error
for _, b := range pending[1:] {
if err = r.ApplyBatchRepr(b.Repr(), false /* sync */); err != nil {
break
}
}

if err == nil {
err = r.commitInternal(syncCommit)
}

// Propagate the error to all of the batches involved in the commit.
for _, b := range pending {
b.commitErr = err
}

c.Lock()
c.committing = false
c.commitSeq = seq
c.cond.Broadcast()
} else {
// We're a follower. Wait for the commit to finish.
for c.commitSeq < seq {
c.cond.Wait()
}
}
c.Unlock()
return r.commitErr
}

return r.commitInternal(syncCommit)
}

func (r *rocksDBBatch) commitInternal(sync bool) error {
start := timeutil.Now()
var count, size int

r.distinctOpen = false
if r.flushes > 0 {
// We've previously flushed mutations to the C++ batch, so we have to flush
// any remaining mutations as well and then commit the batch.
Expand Down

0 comments on commit f99285a

Please sign in to comment.