Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes several mutation tracking issues with leaf nodes. #13

Merged
merged 4 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions iradix.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Txn struct {
// trackOverflow flag, which will cause us to use a more expensive
// algorithm to perform the notifications. Mutation tracking is only
// performed if trackMutate is true.
trackChannels map[*chan struct{}]struct{}
trackChannels map[chan struct{}]struct{}
trackOverflow bool
trackMutate bool
}
Expand All @@ -97,7 +97,7 @@ func (t *Txn) TrackMutate(track bool) {
// overflow flag if we can no longer track any more. This limits the amount of
// state that will accumulate during a transaction and we have a slower algorithm
// to switch to if we overflow.
func (t *Txn) trackChannel(ch *chan struct{}) {
func (t *Txn) trackChannel(ch chan struct{}) {
// In overflow, make sure we don't store any more objects.
if t.trackOverflow {
return
Expand All @@ -118,7 +118,7 @@ func (t *Txn) trackChannel(ch *chan struct{}) {

// Create the map on the fly when we need it.
if t.trackChannels == nil {
t.trackChannels = make(map[*chan struct{}]struct{})
t.trackChannels = make(map[chan struct{}]struct{})
}

// Otherwise we are good to track it.
Expand All @@ -140,25 +140,31 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
}

// If this node has already been modified, we can continue to use it
// during this transaction. If a node gets kicked out of cache then we
// *may* notify for its mutation if we end up copying the node again,
// but we don't make any guarantees about notifying for intermediate
// mutations that were never exposed outside of a transaction.
// during this transaction. We know that we don't need to track it for
// a node update since the node is writable, but if this is for a leaf
// update we track it, in case the initial write to this node didn't
// update the leaf.
if _, ok := t.writable.Get(n); ok {
if t.trackMutate && forLeafUpdate && n.leaf != nil {
t.trackChannel(n.leaf.mutateCh)
}
return n
}

// Mark this node as being mutated.
if t.trackMutate {
t.trackChannel(&(n.mutateCh))
t.trackChannel(n.mutateCh)
}

// Mark its leaf as being mutated, if appropriate.
if t.trackMutate && forLeafUpdate && n.leaf != nil {
t.trackChannel(&(n.leaf.mutateCh))
t.trackChannel(n.leaf.mutateCh)
}

// Copy the existing node.
// Copy the existing node. If you have set forLeafUpdate it will be
// safe to replace this leaf with another after you get your node for
// writing. You MUST replace it, because the channel associated with
// this leaf will be closed when this transaction is committed.
nc := &Node{
mutateCh: make(chan struct{}),
leaf: n.leaf,
Expand All @@ -177,6 +183,29 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
return nc
}

// mergeChild is called to collapse the given node with its child. This is only
// called when the given node is not a leaf and has a single edge.
func (t *Txn) mergeChild(n *Node) {
// Mark the child node as being mutated since we are about to abandon
// it. We don't need to mark the leaf since we are retaining it if it
// is there.
e := n.edges[0]
child := e.node
if t.trackMutate {
t.trackChannel(child.mutateCh)
}

// Merge the nodes.
n.prefix = concat(n.prefix, child.prefix)
n.leaf = child.leaf
if len(child.edges) != 0 {
n.edges = make([]edge, len(child.edges))
copy(n.edges, child.edges)
} else {
n.edges = nil
}
}

// insert does a recursive insertion
func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
// Handle key exhaustion
Expand Down Expand Up @@ -291,7 +320,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {

// Check if this node should be merged
if n != t.root && len(nc.edges) == 1 {
nc.mergeChild()
t.mergeChild(nc)
}
return nc, n.leaf
}
Expand Down Expand Up @@ -320,7 +349,7 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
if newChild.leaf == nil && len(newChild.edges) == 0 {
nc.delEdge(label)
if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
nc.mergeChild()
t.mergeChild(nc)
}
} else {
nc.edges[idx].node = newChild
Expand Down Expand Up @@ -469,7 +498,7 @@ func (t *Txn) Notify() {
t.slowNotify()
} else {
for ch := range t.trackChannels {
close(*ch)
close(ch)
}
}

Expand Down
186 changes: 186 additions & 0 deletions iradix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,31 @@ func TestMergeChildVisibility(t *testing.T) {
}
}

// isClosed returns true if the given channel is closed.
func isClosed(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

// hasAnyClosedMutateCh scans the given tree and returns true if there are any
// closed mutate channels on any nodes or leaves.
func hasAnyClosedMutateCh(r *Tree) bool {
for iter := r.root.rawIterator(); iter.Front() != nil; iter.Next() {
n := iter.Front()
if isClosed(n.mutateCh) {
return true
}
if n.isLeaf() && isClosed(n.leaf.mutateCh) {
return true
}
}
return false
}

func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
for i := 0; i < 3; i++ {
r := New()
Expand Down Expand Up @@ -652,6 +677,9 @@ func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Verify root and parent triggered, and leaf affected
select {
Expand Down Expand Up @@ -706,6 +734,9 @@ func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Verify root and parent triggered, and leaf affected
select {
Expand Down Expand Up @@ -791,6 +822,9 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Verify root and parent triggered, not leaf affected
select {
Expand Down Expand Up @@ -839,6 +873,9 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

select {
case <-rootWatch:
Expand Down Expand Up @@ -894,6 +931,9 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Verify root and parent triggered, not leaf affected
select {
Expand Down Expand Up @@ -942,6 +982,9 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

select {
case <-rootWatch:
Expand Down Expand Up @@ -1058,6 +1101,11 @@ func TestTrackMutate_HugeTxn(t *testing.T) {
// Now do the trigger.
txn.Notify()

// Make sure no closed channels escaped the transaction.
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Verify the watches fired as expected.
select {
case <-rootWatch:
Expand Down Expand Up @@ -1090,3 +1138,141 @@ func TestTrackMutate_HugeTxn(t *testing.T) {
t.Fatalf("bad")
}
}

func TestTrackMutate_mergeChild(t *testing.T) {
// This case does a delete of the "acb" leaf, which causes the "aca"
// leaf to get merged with the old "ac" node:
//
// [root] [root]
// |a |a
// [node] [node]
// b/ \c b/ \c
// (ab) [node] (ab) (aca)
// a/ \b
// (aca) (acb)
//
for i := 0; i < 3; i++ {
r := New()
r, _, _ = r.Insert([]byte("ab"), nil)
r, _, _ = r.Insert([]byte("aca"), nil)
r, _, _ = r.Insert([]byte("acb"), nil)
snapIter := r.root.rawIterator()

// Run through all notification methods as there were bugs in
// both that affected these operations. The slowNotify path
// would detect copied but otherwise identical leaves as changed
// and wrongly close channels. The normal path would fail to
// notify on a child node that had been merged.
txn := r.Txn()
txn.TrackMutate(true)
txn.Delete([]byte("acb"))
switch i {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea!

case 0:
r = txn.Commit()
case 1:
r = txn.CommitOnly()
txn.Notify()
default:
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Run through the old tree and make sure the exact channels we
// expected were closed.
for ; snapIter.Front() != nil; snapIter.Next() {
n := snapIter.Front()
path := snapIter.Path()
switch path {
case "", "a", "ac": // parent nodes all change
if !isClosed(n.mutateCh) || n.leaf != nil {
t.Fatalf("bad")
}
case "ab": // unrelated node / sees no change
if isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
case "aca": // this node gets merged, but the leaf doesn't change
if !isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
case "acb": // this node / leaf gets deleted
if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
default:
t.Fatalf("bad: %s", path)
}
}
}
}

func TestTrackMutate_cachedNodeChange(t *testing.T) {
// This case does a delete of the "acb" leaf, which causes the "aca"
// leaf to get merged with the old "ac" node:
//
// [root] [root]
// |a |a
// [node] [node]
// b/ \c b/ \c
// (ab) [node] (ab) (aca) <- then this leaf gets modified
// a/ \b
// (aca) (acb)
//
// Then it makes a modification to the "aca" leaf on a node that will
// be in the cache, so this makes sure that the leaf watch fires.
for i := 0; i < 3; i++ {
r := New()
r, _, _ = r.Insert([]byte("ab"), nil)
r, _, _ = r.Insert([]byte("aca"), nil)
r, _, _ = r.Insert([]byte("acb"), nil)
snapIter := r.root.rawIterator()

txn := r.Txn()
txn.TrackMutate(true)
txn.Delete([]byte("acb"))
txn.Insert([]byte("aca"), nil)
switch i {
case 0:
r = txn.Commit()
case 1:
r = txn.CommitOnly()
txn.Notify()
default:
r = txn.CommitOnly()
txn.slowNotify()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}

// Run through the old tree and make sure the exact channels we
// expected were closed.
for ; snapIter.Front() != nil; snapIter.Next() {
n := snapIter.Front()
path := snapIter.Path()
switch path {
case "", "a", "ac": // parent nodes all change
if !isClosed(n.mutateCh) || n.leaf != nil {
t.Fatalf("bad")
}
case "ab": // unrelated node / sees no change
if isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
case "aca": // merge changes the node, then we update the leaf
if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
case "acb": // this node / leaf gets deleted
if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
t.Fatalf("bad")
}
default:
t.Fatalf("bad: %s", path)
}
}
}
}
18 changes: 0 additions & 18 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,6 @@ func (n *Node) delEdge(label byte) {
}
}

func (n *Node) mergeChild() {
e := n.edges[0]
child := e.node
n.prefix = concat(n.prefix, child.prefix)
if child.leaf != nil {
n.leaf = new(leafNode)
*n.leaf = *child.leaf
} else {
n.leaf = nil
}
if len(child.edges) != 0 {
n.edges = make([]edge, len(child.edges))
copy(n.edges, child.edges)
} else {
n.edges = nil
}
}

func (n *Node) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
search := k
watch := n.mutateCh
Expand Down