Skip to content

Commit

Permalink
storage: batch command application and coalesce applied state per batch
Browse files Browse the repository at this point in the history
This commit batches raft command application where possible. The basic
approach is to notice that many commands only "trivially" update the replica
state machine. Trivial commands can be processed in a single batch by acting
on a copy of the replica state. Non-trivial commands share the same logic but
always commit alone as they for one reason or another rely on having a view of
the replica or storage engine as of a specific log index.

This commit also sneaks in another optimization which batching enables. Each
command mutates a portion of replica state called the applied state which
tracks a combination of the log index which has been applied and the MVCC
stats of the range as of that application. Before this change each entry would
update this applied state and each of those writes will end up in the WAL,
mem-table, and L0 just the be compacted away in L1. Now that commands are
being applied to the storage engine in a single batch it is easy to only
update the applied state for the last entry in the batch.

For sequential writes this patch shows a considerable performance win. The
below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with
concurrency 64.

```
name            old ops/s   new ops/s   delta
KV0-throughput  20.2k ± 1%  25.2k ± 1%  +24.94%  (p=0.029 n=4+4)

name            old ms/s    new ms/s    delta
KV0-P50          4.45 ± 8%   3.70 ± 5%  -16.85%  (p=0.029 n=4+4)
KV0-Avg          3.10 ± 0%   2.50 ± 0%  -19.35%  (p=0.029 n=4+4)
```

Due to the re-organization of logic in the change, the Replica.mu does not need
to be acquired as many times during the application of a batch. In the common
case it is now acquired exactly twice in the process of applying a batch whereas
before it was acquired more than twice per entry. This should hopefully improve
performance on large machines which experience mutex contention for a single
range.

This effect is visible on large machines. Below are results from running
a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with
concurrency 1024 and 16 initial splits.

```
name            old ops/s   new ops/s    delta
KV0-throughput  78.1k ± 1%  116.8k ± 5%  +49.42%  (p=0.029 n=4+4)

name            old ms/s    new ms/s     delta
KV0-P50          24.4 ± 3%    19.7 ± 7%  -19.28%  (p=0.029 n=4+4)
KV0-Avg          12.6 ± 0%     7.5 ± 9%  -40.87%  (p=0.029 n=4+4)
```

Fixes cockroachdb#37426.

Release note (performance improvement): Batch raft entry application and
coalesce writes to applied state for the batch.
  • Loading branch information
ajwerner committed Jul 1, 2019
1 parent d668aa9 commit 006f72e
Show file tree
Hide file tree
Showing 7 changed files with 1,597 additions and 1,069 deletions.
184 changes: 184 additions & 0 deletions pkg/storage/application_state_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"fmt"
"sync"
)

// applicationStateBufSize is the size of the arrays in an
// applicationStateBufNode.
// TODO(ajwerner): justify this number.
const applicationStateBufSize = 8

// applicationStateBuf is an allocation-efficient buffer used during the
// application of raft entries. Initialization occurs lazily upon the first
// call to pushBack but used applicationStateBuf objects should be released
// explicitly with the destroy() method to release the allocated buffers back
// to the pool.
type applicationStateBuf struct {
len int32
unpushed bool
head, tail *applicationStateBufNode
}

var applicationStateBufNodeSyncPool = sync.Pool{
New: func() interface{} { return new(applicationStateBufNode) },
}

type applicationStateBufNode struct {
applicationStateRingBuf
next *applicationStateBufNode
}

// unpush allows the caller to keep the tail element of the buffer allocated but
// remove it from iteration. After calling unpush, the next call to pushBack
// will return the value from the previous call to pushBack.
// This function may not be called more than once between calls to pushBack and
// may not be called on an empty buf.
func (buf *applicationStateBuf) unpush() {
if buf.unpushed {
panic("already unpushed")
} else if buf.len == 0 {
panic("cannot unpush empty buffer")
}
buf.len--
buf.unpushed = true
}

func (buf *applicationStateBuf) first() *applicationState {
return buf.head.at(0)
}

// pushBack extends the length of buf by one and returns the newly
// added element. If this is the fist call to pushBack following a call to
// unpush, the element returned from this call will be the same value returned
// from the previous call to pushBack.
func (buf *applicationStateBuf) pushBack() *applicationState {
if buf.tail == nil {
n := applicationStateBufNodeSyncPool.Get().(*applicationStateBufNode)
buf.head, buf.tail = n, n
}
buf.len++
if buf.unpushed {
buf.unpushed = false
return buf.tail.at(buf.tail.len - 1)
}
if buf.tail.len == applicationStateBufSize {
newTail := applicationStateBufNodeSyncPool.Get().(*applicationStateBufNode)
buf.tail.next = newTail
buf.tail = newTail
}
return buf.tail.pushBack()
}

// destroy releases allocated nodes back into the sync pool.
// It is illegal to use buf after a call to destroy.
func (buf *applicationStateBuf) destroy() {
for cur := buf.head; cur != nil; {
next := cur.next
*cur = applicationStateBufNode{}
applicationStateBufNodeSyncPool.Put(cur)
cur, buf.head = next, next
}
*buf = applicationStateBuf{}
}

// truncate clears all of the entries currently in a buffer though will never
// release an entry which has been allocated and unpushed.
func (buf *applicationStateBuf) truncate() {
for buf.head != buf.tail {
buf.len -= buf.head.len
buf.head.truncate(buf.head.len)
oldHead := buf.head
newHead := oldHead.next
buf.head = newHead
*oldHead = applicationStateBufNode{}
applicationStateBufNodeSyncPool.Put(oldHead)
}
buf.head.truncate(buf.len)
buf.len = 0
}

// applicationStateRingBuf is a ring-buffer of applicationState.
// It offers indexing and truncation from the front.
type applicationStateRingBuf struct {
len int32
head int32
buf [applicationStateBufSize]applicationState
}

// at returns the application state
func (rb *applicationStateRingBuf) at(idx int32) *applicationState {
if idx >= rb.len {
panic(fmt.Sprintf("index out of range %v, %v", idx, rb.len))
}
return &rb.buf[(rb.head+idx)%applicationStateBufSize]
}

// pushBack extends the length of the ring buffer by one and returns the newly
// added element. It is illegal to call pushBack on a full ring buf.
func (rb *applicationStateRingBuf) pushBack() *applicationState {
if rb.len == applicationStateBufSize {
panic("cannot push onto a full applicationStateRingBuf")
}
ret := &rb.buf[(rb.head+rb.len)%applicationStateBufSize]
rb.len++
return ret
}

// truncate removes the first n elements from the buffer.
// It is illegal to pass a number greater than the current len.
func (rb *applicationStateRingBuf) truncate(n int32) {
if n > rb.len {
panic("cannot truncate more than have")
}
// TODO(ajwerner): consider removing this as an optimization.
for i := int32(0); i < n; i++ {
*rb.at(i) = applicationState{}
}
rb.len -= n
rb.head += n
rb.head %= applicationStateBufSize
}

type applicationStateBufIterator struct {
idx int32
offset int32
buf *applicationStateBuf
node *applicationStateBufNode
}

func (it *applicationStateBufIterator) init(buf *applicationStateBuf) bool {
*it = applicationStateBufIterator{
buf: buf,
node: buf.head,
}
return it.buf.len > 0
}

func (it *applicationStateBufIterator) next() bool {
if it.idx+1 == it.buf.len {
return false
}
it.idx++
it.offset++
if it.offset == applicationStateBufSize {
it.node = it.node.next
it.offset = 0
}
return true
}

func (it *applicationStateBufIterator) state() *applicationState {
return it.node.at(it.offset)
}
48 changes: 48 additions & 0 deletions pkg/storage/application_state_buf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

// TestApplicationStateBuf is an overly simplistic test of the
// applicationStateBuf behavior.
func TestApplicationStateBuf(t *testing.T) {
defer leaktest.AfterTest(t)()
var buf applicationStateBuf
assert.Panics(t, buf.unpush)
var states []*applicationState
for i := 0; i < 5*applicationStateBufSize+1; i++ {
assert.Equal(t, i, int(buf.len))
states = append(states, buf.pushBack())
assert.Equal(t, i+1, int(buf.len))
}
last := states[len(states)-1]
buf.unpush()
assert.Panics(t, buf.unpush)
assert.Equal(t, last, buf.pushBack())
var it applicationStateBufIterator
i := 0
for ok := it.init(&buf); ok; ok = it.next() {
assert.Equal(t, states[i], it.state())
i++
}
buf.unpush()
buf.truncate()
assert.Equal(t, 0, int(buf.len))
assert.Equal(t, last, buf.pushBack())
buf.destroy()
assert.EqualValues(t, buf, applicationStateBuf{})
}
Loading

0 comments on commit 006f72e

Please sign in to comment.