Skip to content

Commit

Permalink
storage: batch command application and coalesce applied state per batch
Browse files Browse the repository at this point in the history
This commit batches raft command application where possible. The basic approach
is to notice that many commands only "trivially" update the replica state
machine. Trivial commands can be processed in a single batch by acting on a
copy of the replica state. Non-trivial commands share the same logic but always
commit alone as they for one reason or another rely on having a view of the
replica or storage engine as of a specific log index.

This commit also sneaks in another optimization which batching enables. Each
command mutates a portion of replica state called the applied state which
tracks a combination of the log index which has been applied and the MVCC stats
of the range as of that application. Before this change each entry would update
this applied state and each of those writes will end up in the WAL and
mem-table just the be compacted away in L1. Now that commands are being applied
to the storage engine in a single batch it is easy to only update the applied
state for the last entry in the batch.

For sequential writes this patch shows a considerable performance win. The
below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with
concurrency 128.

```
name            old ops/s   new ops/s   delta
KV0-throughput  22.1k ± 1%  32.8k ± 1%  +48.59%  (p=0.029 n=4+4)

name            old ms/s    new ms/s    delta
KV0-P50          7.15 ± 2%   6.00 ± 0%  -16.08%  (p=0.029 n=4+4)
KV0-Avg          5.80 ± 0%   3.80 ± 0%  -34.48%  (p=0.029 n=4+4)
```

Due to the re-organization of logic in the change, the Replica.mu does not need
to be acquired as many times during the application of a batch. In the common
case it is now acquired exactly twice in the process of applying a batch
whereas before it was acquired more than twice per entry. This should hopefully
improve performance on large machines which experience mutex contention for a
single range.

This effect is visible on large machines. Below are results from running
a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with
concurrency 1024 and 16 initial splits.

```
name            old ops/s   new ops/s    delta
KV0-throughput  78.1k ± 1%  116.8k ± 5%  +49.42%  (p=0.029 n=4+4)

name            old ms/s    new ms/s     delta
KV0-P50          24.4 ± 3%    19.7 ± 7%  -19.28%  (p=0.029 n=4+4)
KV0-Avg          12.6 ± 0%     7.5 ± 9%  -40.87%  (p=0.029 n=4+4)
```

Fixes cockroachdb#37426.

Release note (performance improvement): Batch raft entry application and
coalesce writes to applied state for the batch.
  • Loading branch information
ajwerner committed Jul 6, 2019
1 parent 9322e07 commit b65d72c
Show file tree
Hide file tree
Showing 7 changed files with 1,621 additions and 1,070 deletions.
166 changes: 166 additions & 0 deletions pkg/storage/entry_application_state_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"fmt"
"sync"
)

// entryApplicationStateBufNodeSize is the size of the arrays in an
// entryApplicationStateBufNode.
// TODO(ajwerner): justify this number.
const entryApplicationStateBufNodeSize = 8

// entryApplicationStateBuf is an allocation-efficient buffer used during the
// application of raft entries. Initialization occurs lazily upon the first
// call to allocate but used entryApplicationStateBuf objects should be released
// explicitly with the destroy() method to release the allocated buffers back
// to the pool.
type entryApplicationStateBuf struct {
len int32
head, tail *entryApplicationStateBufNode
}

var entryApplicationStateBufNodeSyncPool = sync.Pool{
New: func() interface{} { return new(entryApplicationStateBufNode) },
}

type entryApplicationStateBufNode struct {
entryApplicationStateRingBuf
next *entryApplicationStateBufNode
}

func (buf *entryApplicationStateBuf) last() *entryApplicationState {
return buf.tail.at(buf.tail.len - 1)
}

// allocate extends the length of buf by one and returns the newly
// added element. If this is the fist call to allocate it will initialize buf.
// After a buf is initialized it should be explicitly destroyed.
func (buf *entryApplicationStateBuf) allocate() *entryApplicationState {
if buf.tail == nil {
n := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode)
buf.head, buf.tail = n, n
}
buf.len++
if buf.tail.len == entryApplicationStateBufNodeSize {
newTail := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode)
buf.tail.next = newTail
buf.tail = newTail
}
return buf.tail.allocate()
}

// destroy releases allocated nodes back into the sync pool.
// It is illegal to use buf after a call to destroy.
func (buf *entryApplicationStateBuf) destroy() {
for cur := buf.head; cur != nil; {
next := cur.next
*cur = entryApplicationStateBufNode{}
entryApplicationStateBufNodeSyncPool.Put(cur)
cur, buf.head = next, next
}
*buf = entryApplicationStateBuf{}
}

// truncate clears all of the entries currently in a buffer.
func (buf *entryApplicationStateBuf) truncate() {
for buf.head != buf.tail {
buf.len -= buf.head.len
buf.head.truncate(buf.head.len)
oldHead := buf.head
newHead := oldHead.next
buf.head = newHead
*oldHead = entryApplicationStateBufNode{}
entryApplicationStateBufNodeSyncPool.Put(oldHead)
}
buf.head.truncate(buf.len)
buf.len = 0
}

// entryApplicationStateRingBuf is a ring-buffer of entryApplicationState.
// It offers indexing and truncation from the front.
type entryApplicationStateRingBuf struct {
len int32
head int32
buf [entryApplicationStateBufNodeSize]entryApplicationState
}

// at returns the application state at the requested idx.
func (rb *entryApplicationStateRingBuf) at(idx int32) *entryApplicationState {
if idx >= rb.len {
panic(fmt.Sprintf("index out of range %v, %v", idx, rb.len))
}
return &rb.buf[(rb.head+idx)%entryApplicationStateBufNodeSize]
}

// allocate extends the length of the ring buffer by one and returns the newly
// added element. It is illegal to call allocate on a full ring buf.
func (rb *entryApplicationStateRingBuf) allocate() *entryApplicationState {
if rb.len == entryApplicationStateBufNodeSize {
panic("cannot push onto a full entryApplicationStateRingBuf")
}
ret := &rb.buf[(rb.head+rb.len)%entryApplicationStateBufNodeSize]
rb.len++
return ret
}

// truncate removes the first n elements from the buffer.
// It is illegal to pass a number greater than the current len.
func (rb *entryApplicationStateRingBuf) truncate(n int32) {
if n > rb.len {
panic("cannot truncate more than have")
}
// TODO(ajwerner): consider removing this as an optimization.
for i := int32(0); i < n; i++ {
*rb.at(i) = entryApplicationState{}
}
rb.len -= n
rb.head += n
rb.head %= entryApplicationStateBufNodeSize
}

type entryApplicationStateBufIterator struct {
idx int32
offset int32
buf *entryApplicationStateBuf
node *entryApplicationStateBufNode
}

func (it *entryApplicationStateBufIterator) init(buf *entryApplicationStateBuf) bool {
*it = entryApplicationStateBufIterator{
buf: buf,
node: buf.head,
}
return it.buf.len > 0
}

func (it *entryApplicationStateBufIterator) state() *entryApplicationState {
return it.node.at(it.offset)
}

func (it *entryApplicationStateBufIterator) isLast() bool {
return it.idx+1 == it.buf.len
}

func (it *entryApplicationStateBufIterator) next() bool {
if it.idx+1 == it.buf.len {
return false
}
it.idx++
it.offset++
if it.offset == entryApplicationStateBufNodeSize {
it.node = it.node.next
it.offset = 0
}
return true
}
44 changes: 44 additions & 0 deletions pkg/storage/entry_application_state_buf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

// TestApplicationStateBuf is an overly simplistic test of the
// entryApplicationStateBuf behavior.
func TestApplicationStateBuf(t *testing.T) {
defer leaktest.AfterTest(t)()
var buf entryApplicationStateBuf
var states []*entryApplicationState
for i := 0; i < 5*entryApplicationStateBufNodeSize+1; i++ {
assert.Equal(t, i, int(buf.len))
states = append(states, buf.allocate())
assert.Equal(t, i+1, int(buf.len))
}
last := states[len(states)-1]
assert.Equal(t, last, buf.last())
var it entryApplicationStateBufIterator
i := 0
for ok := it.init(&buf); ok; ok = it.next() {
assert.Equal(t, states[i], it.state())
i++
}
buf.truncate()
assert.Equal(t, 0, int(buf.len))
assert.Equal(t, last, buf.last())
buf.destroy()
assert.EqualValues(t, buf, entryApplicationStateBuf{})
}
Loading

0 comments on commit b65d72c

Please sign in to comment.