Skip to content

Commit

Permalink
EventLoop: store Timers in min Pairing Heap [fixup #14996] (#15206)
Browse files Browse the repository at this point in the history
Related to [RFC #12](crystal-lang/rfcs#12).

Replaces the `Deque` used in #14996 for a min [Pairing Heap] which is a kind of [Mergeable Heap] and is one of the best performing heap in practical tests when arbitrary deletions are required (think cancelling a timeout), otherwise a D-ary Heap (e.g. 4-heap) will usually perform better. See the [A Nearly-Tight Analysis of Multipass Pairing Heaps](https://epubs.siam.org/doi/epdf/10.1137/1.9781611973068.52) paper or the Wikipedia page for more details.

The implementation itself is based on the [Pairing Heaps: Experiments and Analysis](https://dl.acm.org/doi/pdf/10.1145/214748.214759) paper, and merely implements a recursive twopass algorithm (the auxiliary twopass might perform even better). The `Crystal::PointerPairingList(T)` type is generic and relies on intrusive nodes (the links are into `T`) to avoid extra allocations for the nodes (same as `Crystal::PointerLinkedList(T)`). It also requires a `T#heap_compare` method, so we can use the same type for a min or max heap, or to build a more complex comparison.

Note: I also tried a 4-heap, and while it performs very well and only needs a flat array, the arbitrary deletion (e.g. cancelling timeout) needs a linear scan and its performance quickly plummets, even at low occupancy, and becomes painfully slow at higher occupancy (tens of microseconds on _each_ delete, while the pairing heap does it in tens of nanoseconds).

Follow up to #14996 

[Mergeable Heap]: https://en.wikipedia.org/wiki/Mergeable_heap
[Pairing Heap]: https://en.wikipedia.org/wiki/Pairing_heap
[D-ary Heap]: https://en.wikipedia.org/wiki/D-ary_heap

Co-authored-by: Linus Sellberg <linus.sellberg@nj.se>
Co-authored-by: Johannes Müller <straightshoota@gmail.com>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent ec11b2d commit 6928ca7
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 59 deletions.
28 changes: 15 additions & 13 deletions spec/std/crystal/evented/timers_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ describe Crystal::Evented::Timers do
event = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 7.seconds)
timers.add(pointerof(event))
timers.empty?.should be_false

timers.delete(pointerof(event))
timers.empty?.should be_true
end

it "#next_ready?" do
Expand All @@ -18,9 +21,18 @@ describe Crystal::Evented::Timers do
timers.next_ready?.should be_nil

# with events
event = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 5.seconds)
timers.add(pointerof(event))
timers.next_ready?.should eq(event.wake_at?)
event1s = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 1.second)
event3m = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 3.minutes)
event5m = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 5.minutes)

timers.add(pointerof(event5m))
timers.next_ready?.should eq(event5m.wake_at?)

timers.add(pointerof(event1s))
timers.next_ready?.should eq(event1s.wake_at?)

timers.add(pointerof(event3m))
timers.next_ready?.should eq(event1s.wake_at?)
end

it "#dequeue_ready" do
Expand Down Expand Up @@ -66,16 +78,6 @@ describe Crystal::Evented::Timers do

event0.wake_at = -1.minute
timers.add(pointerof(event0)).should be_true # added new head (next ready)

events = [] of Crystal::Evented::Event*
timers.each { |event| events << event }
events.should eq([
pointerof(event0),
pointerof(event1),
pointerof(event3),
pointerof(event2),
])
timers.empty?.should be_false
end

it "#delete" do
Expand Down
150 changes: 150 additions & 0 deletions spec/std/crystal/pointer_pairing_heap_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "spec"
require "../../../src/crystal/pointer_pairing_heap"

private struct Node
getter key : Int32

include Crystal::PointerPairingHeap::Node

def initialize(@key : Int32)
end

def heap_compare(other : Pointer(self)) : Bool
key < other.value.key
end

def inspect(io : IO, indent = 0) : Nil
prv = @heap_previous
nxt = @heap_next
chd = @heap_child

indent.times { io << ' ' }
io << "Node value=" << key
io << " prv=" << prv.try(&.value.key)
io << " nxt=" << nxt.try(&.value.key)
io << " chd=" << chd.try(&.value.key)
io.puts

node = heap_child?
while node
node.value.inspect(io, indent + 2)
node = node.value.heap_next?
end
end
end

describe Crystal::PointerPairingHeap do
it "#add" do
heap = Crystal::PointerPairingHeap(Node).new
node1 = Node.new(1)
node2 = Node.new(2)
node2b = Node.new(2)
node3 = Node.new(3)

# can add distinct nodes
heap.add(pointerof(node3))
heap.add(pointerof(node1))
heap.add(pointerof(node2))

# can add duplicate key (different nodes)
heap.add(pointerof(node2b))

# can't add same node twice
expect_raises(ArgumentError) { heap.add(pointerof(node1)) }

# can re-add removed nodes
heap.delete(pointerof(node3))
heap.add(pointerof(node3))

heap.shift?.should eq(pointerof(node1))
heap.add(pointerof(node1))
end

it "#shift?" do
heap = Crystal::PointerPairingHeap(Node).new
nodes = StaticArray(Node, 10).new { |i| Node.new(i) }

# insert in random order
(0..9).to_a.shuffle.each do |i|
heap.add nodes.to_unsafe + i
end

# removes in ascending order
10.times do |i|
node = heap.shift?
node.should eq(nodes.to_unsafe + i)
end
end

it "#delete" do
heap = Crystal::PointerPairingHeap(Node).new
nodes = StaticArray(Node, 10).new { |i| Node.new(i) }

# insert in random order
(0..9).to_a.shuffle.each do |i|
heap.add nodes.to_unsafe + i
end

# remove some values
heap.delete(nodes.to_unsafe + 3)
heap.delete(nodes.to_unsafe + 7)
heap.delete(nodes.to_unsafe + 1)

# remove tail
heap.delete(nodes.to_unsafe + 9)

# remove head
heap.delete(nodes.to_unsafe + 0)

# repeatedly delete min
[2, 4, 5, 6, 8].each do |i|
heap.shift?.should eq(nodes.to_unsafe + i)
end
heap.shift?.should be_nil
end

it "adds 1000 nodes then shifts them in order" do
heap = Crystal::PointerPairingHeap(Node).new

nodes = StaticArray(Node, 1000).new { |i| Node.new(i) }
(0..999).to_a.shuffle.each { |i| heap.add(nodes.to_unsafe + i) }

i = 0
while node = heap.shift?
node.value.key.should eq(i)
i += 1
end
i.should eq(1000)

heap.shift?.should be_nil
end

it "randomly shift while we add nodes" do
heap = Crystal::PointerPairingHeap(Node).new

nodes = uninitialized StaticArray(Node, 1000)
(0..999).to_a.shuffle.each_with_index { |i, j| nodes[j] = Node.new(i) }

i = 0
removed = 0

# regularly calls delete-min while we insert
loop do
if rand(0..5) == 0
removed += 1 if heap.shift?
else
heap.add(nodes.to_unsafe + i)
break if (i += 1) == 1000
end
end

# exhaust the heap
while heap.shift?
removed += 1
end

# we must have added and removed all nodes _once_
i.should eq(1000)
removed.should eq(1000)
end
end
158 changes: 158 additions & 0 deletions src/crystal/pointer_pairing_heap.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# :nodoc:
#
# Tree of `T` structs referenced as pointers.
# `T` must include `Crystal::PointerPairingHeap::Node`.
class Crystal::PointerPairingHeap(T)
module Node
macro included
property? heap_previous : Pointer(self)?
property? heap_next : Pointer(self)?
property? heap_child : Pointer(self)?
end

# Compare self with other. For example:
#
# Use `<` to create a min heap.
# Use `>` to create a max heap.
abstract def heap_compare(other : Pointer(self)) : Bool
end

@head : Pointer(T)?

private def head=(head)
@head = head
head.value.heap_previous = nil if head
head
end

def empty?
@head.nil?
end

def first? : Pointer(T)?
@head
end

def shift? : Pointer(T)?
if node = @head
self.head = merge_pairs(node.value.heap_child?)
node.value.heap_child = nil
node
end
end

def add(node : Pointer(T)) : Nil
if node.value.heap_previous? || node.value.heap_next? || node.value.heap_child?
raise ArgumentError.new("The node is already in a Pairing Heap tree")
end
self.head = meld(@head, node)
end

def delete(node : Pointer(T)) : Nil
if previous_node = node.value.heap_previous?
next_sibling = node.value.heap_next?

if previous_node.value.heap_next? == node
previous_node.value.heap_next = next_sibling
else
previous_node.value.heap_child = next_sibling
end

if next_sibling
next_sibling.value.heap_previous = previous_node
end

subtree = merge_pairs(node.value.heap_child?)
clear(node)
self.head = meld(@head, subtree)
else
# removing head
self.head = merge_pairs(node.value.heap_child?)
node.value.heap_child = nil
end
end

def clear : Nil
if node = @head
clear_recursive(node)
@head = nil
end
end

private def clear_recursive(node)
child = node.value.heap_child?
while child
clear_recursive(child)
child = child.value.heap_next?
end
clear(node)
end

private def meld(a : Pointer(T), b : Pointer(T)) : Pointer(T)
if a.value.heap_compare(b)
add_child(a, b)
else
add_child(b, a)
end
end

private def meld(a : Pointer(T), b : Nil) : Pointer(T)
a
end

private def meld(a : Nil, b : Pointer(T)) : Pointer(T)
b
end

private def meld(a : Nil, b : Nil) : Nil
end

private def add_child(parent : Pointer(T), node : Pointer(T)) : Pointer(T)
first_child = parent.value.heap_child?
parent.value.heap_child = node

first_child.value.heap_previous = node if first_child
node.value.heap_previous = parent
node.value.heap_next = first_child

parent
end

private def merge_pairs(node : Pointer(T)?) : Pointer(T)?
return unless node

# 1st pass: meld children into pairs (left to right)
tail = nil

while a = node
if b = a.value.heap_next?
node = b.value.heap_next?
root = meld(a, b)
root.value.heap_previous = tail
tail = root
else
a.value.heap_previous = tail
tail = a
break
end
end

# 2nd pass: meld the pairs back into a single tree (right to left)
root = nil

while tail
node = tail.value.heap_previous?
root = meld(root, tail)
tail = node
end

root.value.heap_next = nil if root
root
end

private def clear(node) : Nil
node.value.heap_previous = nil
node.value.heap_next = nil
node.value.heap_child = nil
end
end
8 changes: 8 additions & 0 deletions src/crystal/system/unix/evented/event.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "crystal/pointer_linked_list"
require "crystal/pointer_pairing_heap"

# Information about the event that a `Fiber` is waiting on.
#
Expand Down Expand Up @@ -35,6 +36,9 @@ struct Crystal::Evented::Event
# The event can be added to `Waiters` lists.
include PointerLinkedList::Node

# The event can be added to the `Timers` list.
include PointerPairingHeap::Node

def initialize(@type : Type, @fiber, @index = nil, timeout : Time::Span? = nil)
if timeout
seconds, nanoseconds = System::Time.monotonic
Expand All @@ -55,4 +59,8 @@ struct Crystal::Evented::Event
# NOTE: musn't be changed after registering the event into `Timers`!
def wake_at=(@wake_at)
end

def heap_compare(other : Pointer(self)) : Bool
wake_at < other.value.wake_at
end
end
Loading

0 comments on commit 6928ca7

Please sign in to comment.