Skip to content

storage/cmdq: O(1) copy-on-write btree clones and atomic refcount GC policy #32251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 148 additions & 24 deletions pkg/storage/cmdq/interval_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"bytes"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -107,21 +109,126 @@ func upperBound(c *cmd) keyBound {
}

type leafNode struct {
max keyBound
ref int32
count int16
leaf bool
max keyBound
cmds [maxCmds]*cmd
}

func newLeafNode() *node {
return (*node)(unsafe.Pointer(&leafNode{leaf: true}))
}

type node struct {
leafNode
children [maxCmds + 1]*node
}

func leafToNode(ln *leafNode) *node {
return (*node)(unsafe.Pointer(ln))
}

func nodeToLeaf(n *node) *leafNode {
return (*leafNode)(unsafe.Pointer(n))
}

var leafPool = sync.Pool{
New: func() interface{} {
return new(leafNode)
},
}

var nodePool = sync.Pool{
New: func() interface{} {
return new(node)
},
}

func newLeafNode() *node {
n := leafToNode(leafPool.Get().(*leafNode))
n.leaf = true
n.ref = 1
return n
}

func newNode() *node {
n := nodePool.Get().(*node)
n.ref = 1
return n
}

// mut creates and returns a mutable node reference. If the node is not shared
// with any other trees then it can be modified in place. Otherwise, it must be
// cloned to ensure unique ownership. In this way, we enforce a copy-on-write
// policy which transparently incorporates the idea of local mutations, like
// Clojure's transients or Haskell's ST monad, where nodes are only copied
// during the first time that they are modified between Clone operations.
//
// When a node is cloned, the provided pointer will be redirected to the new
// mutable node.
func mut(n **node) *node {
if atomic.LoadInt32(&(*n).ref) == 1 {
// Exclusive ownership. Can mutate in place.
return *n
}
// If we do not have unique ownership over the node then we
// clone it to gain unique ownership. After doing so, we can
// release our reference to the old node.
c := (*n).clone()
(*n).decRef(true /* recursive */)
*n = c
return *n
}

// incRef acquires a reference to the node.
func (n *node) incRef() {
atomic.AddInt32(&n.ref, 1)
}

// decRef releases a reference to the node. If requested, the method
// will recurse into child nodes and decrease their refcounts as well.
func (n *node) decRef(recursive bool) {
if atomic.AddInt32(&n.ref, -1) > 0 {
// Other references remain. Can't free.
return
}
// Clear and release node into memory pool.
if n.leaf {
ln := nodeToLeaf(n)
*ln = leafNode{}
leafPool.Put(ln)
} else {
// Release child references first, if requested.
if recursive {
for i := int16(0); i <= n.count; i++ {
n.children[i].decRef(true /* recursive */)
}
}
*n = node{}
nodePool.Put(n)
}
}

// clone creates a clone of the receiver with a single reference count.
func (n *node) clone() *node {
var c *node
if n.leaf {
c = newLeafNode()
} else {
c = newNode()
}
// NB: copy field-by-field without touching n.ref to avoid
// triggering the race detector and looking like a data race.
c.count = n.count
c.max = n.max
c.cmds = n.cmds
if !c.leaf {
// Copy children and increase each refcount.
c.children = n.children
for i := int16(0); i <= c.count; i++ {
c.children[i].incRef()
}
}
return c
}

func (n *node) insertAt(index int, c *cmd, nd *node) {
if index < int(n.count) {
copy(n.cmds[index+1:n.count+1], n.cmds[index:n.count])
Expand Down Expand Up @@ -247,7 +354,7 @@ func (n *node) split(i int) (*cmd, *node) {
if n.leaf {
next = newLeafNode()
} else {
next = &node{}
next = newNode()
}
next.count = n.count - int16(i+1)
copy(next.cmds[:], n.cmds[i+1:n.count])
Expand Down Expand Up @@ -287,7 +394,7 @@ func (n *node) insert(c *cmd) (replaced, newBound bool) {
return false, n.adjustUpperBoundOnInsertion(c, nil)
}
if n.children[i].count >= maxCmds {
splitcmd, splitNode := n.children[i].split(maxCmds / 2)
splitcmd, splitNode := mut(&n.children[i]).split(maxCmds / 2)
n.insertAt(i, splitcmd, splitNode)

switch cmp := cmp(c, n.cmds[i]); {
Expand All @@ -300,7 +407,7 @@ func (n *node) insert(c *cmd) (replaced, newBound bool) {
return true, false
}
}
replaced, newBound = n.children[i].insert(c)
replaced, newBound = mut(&n.children[i]).insert(c)
if newBound {
newBound = n.adjustUpperBoundOnInsertion(c, nil)
}
Expand All @@ -317,7 +424,7 @@ func (n *node) removeMax() *cmd {
n.adjustUpperBoundOnRemoval(out, nil)
return out
}
child := n.children[n.count]
child := mut(&n.children[n.count])
if child.count <= minCmds {
n.rebalanceOrMerge(int(n.count))
return n.removeMax()
Expand All @@ -337,12 +444,12 @@ func (n *node) remove(c *cmd) (out *cmd, newBound bool) {
}
return nil, false
}
child := n.children[i]
if child.count <= minCmds {
if n.children[i].count <= minCmds {
// Child not large enough to remove from.
n.rebalanceOrMerge(i)
return n.remove(c)
}
child := mut(&n.children[i])
if found {
// Replace the cmd being removed with the max cmd in our left child.
out = n.cmds[i]
Expand Down Expand Up @@ -390,8 +497,8 @@ func (n *node) rebalanceOrMerge(i int) {
// v
// a
//
left := n.children[i-1]
child := n.children[i]
left := mut(&n.children[i-1])
child := mut(&n.children[i])
xCmd, grandChild := left.popBack()
yCmd := n.cmds[i-1]
child.pushFront(yCmd, grandChild)
Expand Down Expand Up @@ -429,8 +536,8 @@ func (n *node) rebalanceOrMerge(i int) {
// v
// a
//
right := n.children[i+1]
child := n.children[i]
right := mut(&n.children[i+1])
child := mut(&n.children[i])
xCmd, grandChild := right.popFront()
yCmd := n.cmds[i]
child.pushBack(yCmd, grandChild)
Expand Down Expand Up @@ -465,7 +572,9 @@ func (n *node) rebalanceOrMerge(i int) {
if i >= int(n.count) {
i = int(n.count - 1)
}
child := n.children[i]
child := mut(&n.children[i])
// Make mergeChild mutable, bumping the refcounts on its children if necessary.
_ = mut(&n.children[i+1])
mergeCmd, mergeChild := n.removeAt(i)
child.cmds[child.count] = mergeCmd
copy(child.cmds[child.count+1:], mergeChild.cmds[:mergeChild.count])
Expand All @@ -475,6 +584,7 @@ func (n *node) rebalanceOrMerge(i int) {
child.count += mergeChild.count + 1

child.adjustUpperBoundOnInsertion(mergeCmd, mergeChild)
mergeChild.decRef(false /* recursive */)
}
}

Expand Down Expand Up @@ -548,25 +658,39 @@ type btree struct {
length int
}

// Reset removes all cmds from the btree.
// Reset removes all cmds from the btree. In doing so, it allows memory
// held by the btree to be recycled. Failure to call this method before
// letting a btree be GCed is safe in that it won't cause a memory leak,
// but it will prevent btree nodes from being efficiently re-used.
func (t *btree) Reset() {
t.root = nil
if t.root != nil {
t.root.decRef(true /* recursive */)
t.root = nil
}
t.length = 0
}

// Silent unused warning.
var _ = (*btree).Reset
// Clone clones the btree, lazily.
func (t *btree) Clone() btree {
c := *t
if c.root != nil {
c.root.incRef()
}
return c
}

// Delete removes a cmd equal to the passed in cmd from the tree.
func (t *btree) Delete(c *cmd) {
if t.root == nil || t.root.count == 0 {
return
}
if out, _ := t.root.remove(c); out != nil {
if out, _ := mut(&t.root).remove(c); out != nil {
t.length--
}
if t.root.count == 0 && !t.root.leaf {
old := t.root
t.root = t.root.children[0]
old.decRef(false /* recursive */)
}
}

Expand All @@ -576,16 +700,16 @@ func (t *btree) Set(c *cmd) {
if t.root == nil {
t.root = newLeafNode()
} else if t.root.count >= maxCmds {
splitcmd, splitNode := t.root.split(maxCmds / 2)
newRoot := &node{}
splitcmd, splitNode := mut(&t.root).split(maxCmds / 2)
newRoot := newNode()
newRoot.count = 1
newRoot.cmds[0] = splitcmd
newRoot.children[0] = t.root
newRoot.children[1] = splitNode
newRoot.max = newRoot.findUpperBound()
t.root = newRoot
}
if replaced, _ := t.root.insert(c); !replaced {
if replaced, _ := mut(&t.root).insert(c); !replaced {
t.length++
}
}
Expand Down
Loading