diff --git a/spec/std/crystal/evented/arena_spec.cr b/spec/std/crystal/evented/arena_spec.cr index c25bb9ec1adc..edf5fd90e11b 100644 --- a/spec/std/crystal/evented/arena_spec.cr +++ b/spec/std/crystal/evented/arena_spec.cr @@ -5,7 +5,7 @@ require "spec" describe Crystal::Evented::Arena do describe "#allocate_at?" do it "yields block when not allocated" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) pointer = nil index = nil called = 0 @@ -31,7 +31,7 @@ describe Crystal::Evented::Arena do end it "allocates up to capacity" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) indexes = [] of Crystal::Evented::Arena::Index indexes = 32.times.map do |i| @@ -49,7 +49,7 @@ describe Crystal::Evented::Arena do end it "checks bounds" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) expect_raises(IndexError) { arena.allocate_at?(-1) { } } expect_raises(IndexError) { arena.allocate_at?(33) { } } end @@ -57,7 +57,7 @@ describe Crystal::Evented::Arena do describe "#get" do it "returns previously allocated object" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) pointer = nil index = arena.allocate_at(30) do |ptr| @@ -77,7 +77,7 @@ describe Crystal::Evented::Arena do end it "can't access unallocated object" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) expect_raises(RuntimeError) do arena.get(Crystal::Evented::Arena::Index.new(10, 0)) { } @@ -85,7 +85,7 @@ describe Crystal::Evented::Arena do end it "checks generation" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 index1 = arena.allocate_at(2) { called += 1 } @@ -102,7 +102,7 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) { } } expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) { } } end @@ -110,7 +110,7 @@ describe Crystal::Evented::Arena do describe "#get?" do it "returns previously allocated object" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) pointer = nil index = arena.allocate_at(30) do |ptr| @@ -131,7 +131,7 @@ describe Crystal::Evented::Arena do end it "can't access unallocated index" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 ret = arena.get?(Crystal::Evented::Arena::Index.new(10, 0)) { called += 1 } @@ -140,7 +140,7 @@ describe Crystal::Evented::Arena do end it "checks generation" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 old_index = arena.allocate_at(2) { } @@ -166,7 +166,7 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 }.should be_false @@ -178,7 +178,7 @@ describe Crystal::Evented::Arena do describe "#free" do it "deallocates the object" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) index1 = arena.allocate_at(3) { |ptr| ptr.value = 123 } arena.free(index1) { } @@ -192,7 +192,7 @@ describe Crystal::Evented::Arena do end it "checks generation" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 old_index = arena.allocate_at(1) { } @@ -214,7 +214,7 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32).new(32) + arena = Crystal::Evented::Arena(Int32, 96).new(32) called = 0 arena.free(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 } @@ -223,4 +223,32 @@ describe Crystal::Evented::Arena do called.should eq(0) end end + + it "#each_index" do + arena = Crystal::Evented::Arena(Int32, 96).new(32) + indices = [] of {Int32, Crystal::Evented::Arena::Index} + + arena.each_index { |i, index| indices << {i, index} } + indices.should be_empty + + index5 = arena.allocate_at(5) { } + + arena.each_index { |i, index| indices << {i, index} } + indices.should eq([{5, index5}]) + + index3 = arena.allocate_at(3) { } + index11 = arena.allocate_at(11) { } + index10 = arena.allocate_at(10) { } + index30 = arena.allocate_at(30) { } + + indices.clear + arena.each_index { |i, index| indices << {i, index} } + indices.should eq([ + {3, index3}, + {5, index5}, + {10, index10}, + {11, index11}, + {30, index30}, + ]) + end end diff --git a/spec/std/crystal/evented/timers_spec.cr b/spec/std/crystal/evented/timers_spec.cr index d40917910d1d..372c3f5213e9 100644 --- a/spec/std/crystal/evented/timers_spec.cr +++ b/spec/std/crystal/evented/timers_spec.cr @@ -67,14 +67,16 @@ describe Crystal::Evented::Timers do event0.wake_at = -1.minute timers.add(pointerof(event0)).should be_true # added new head (next ready) - events = [] of Crystal::Evented::Event* - timers.each { |event| events << event } - events.should eq([ - pointerof(event0), - pointerof(event1), - pointerof(event3), - pointerof(event2), - ]) + # TODO: forcefully dequeue the next ready event, then check the order + + # events = [] of Crystal::Evented::Event* + # timers.each { |event| events << event } + # events.should eq([ + # pointerof(event0), + # pointerof(event1), + # pointerof(event3), + # pointerof(event2), + # ]) timers.empty?.should be_false end diff --git a/spec/std/crystal/pairing_heap_spec.cr b/spec/std/crystal/pairing_heap_spec.cr new file mode 100644 index 000000000000..f72a45c13fd6 --- /dev/null +++ b/spec/std/crystal/pairing_heap_spec.cr @@ -0,0 +1,137 @@ +require "spec" +require "../../../src/crystal/pointer_pairing_heap" + +private struct Node + getter key : Int32 + + include Crystal::PointerPairingHeap::Node + + def initialize(@key : Int32) + end + + def heap_compare(other : Pointer(self)) : Bool + key < other.value.key + end +end + +describe Crystal::PointerPairingHeap do + it "#add" do + heap = Crystal::PointerPairingHeap(Node).new + node1 = Node.new(1) + node2 = Node.new(2) + node2b = Node.new(2) + node3 = Node.new(3) + + # can add distinct nodes + heap.add(pointerof(node3)) + heap.add(pointerof(node1)) + heap.add(pointerof(node2)) + + # can add duplicate key (different nodes) + heap.add(pointerof(node2b)) + + # can't add same node twice + expect_raises(ArgumentError) { heap.add(pointerof(node1)) } + + # can re-add removed nodes + heap.delete(pointerof(node3)) + heap.add(pointerof(node3)) + + heap.shift?.should eq(pointerof(node1)) + heap.add(pointerof(node1)) + end + + it "#shift?" do + heap = Crystal::PointerPairingHeap(Node).new + nodes = StaticArray(Node, 10).new { |i| Node.new(i) } + + # insert in random order + (0..9).to_a.shuffle.each do |i| + heap.add nodes.to_unsafe + i + end + + # removes in ascending order + 10.times do |i| + heap.shift?.should eq(nodes.to_unsafe + i) + end + end + + it "#delete" do + heap = Crystal::PointerPairingHeap(Node).new + nodes = StaticArray(Node, 10).new { |i| Node.new(i) } + + # noop: empty + heap.delete(nodes.to_unsafe + 0).should eq({false, false}) + + # insert in random order + (0..9).to_a.shuffle.each do |i| + heap.add nodes.to_unsafe + i + end + + # noop: unknown node + node11 = Node.new(11) + heap.delete(pointerof(node11)).should eq({false, false}) + + # remove some values + heap.delete(nodes.to_unsafe + 3).should eq({true, false}) + heap.delete(nodes.to_unsafe + 7).should eq({true, false}) + heap.delete(nodes.to_unsafe + 1).should eq({true, false}) + + # remove tail + heap.delete(nodes.to_unsafe + 9).should eq({true, false}) + + # remove head + heap.delete(nodes.to_unsafe + 0).should eq({true, true}) + + # repeatedly delete min + [2, 4, 5, 6, 8].each do |i| + heap.shift?.should eq(nodes.to_unsafe + i) + end + heap.shift?.should be_nil + end + + it "adds 1000 nodes then shifts them in order" do + heap = Crystal::PointerPairingHeap(Node).new + + nodes = StaticArray(Node, 1000).new { |i| Node.new(i) } + (0..999).to_a.shuffle.each { |i| heap.add(nodes.to_unsafe + i) } + + i = 0 + while node = heap.shift? + node.value.key.should eq(i) + i += 1 + end + i.should eq(1000) + + heap.shift?.should be_nil + end + + it "randomly adds while we shift nodes" do + heap = Crystal::PointerPairingHeap(Node).new + + nodes = uninitialized StaticArray(Node, 1000) + (0..999).to_a.shuffle.each_with_index { |i, j| nodes[j] = Node.new(i) } + + i = 0 + removed = 0 + + # regularly calls delete-min while we insert + loop do + if rand(0..5) == 0 + removed += 1 if heap.shift? + else + heap.add(nodes.to_unsafe + i) + break if (i += 1) == 1000 + end + end + + # exhaust the heap + while heap.shift? + removed += 1 + end + + # we must have added and removed all nodes _once_ + i.should eq(1000) + removed.should eq(1000) + end +end diff --git a/src/crystal/pointer_pairing_heap.cr b/src/crystal/pointer_pairing_heap.cr new file mode 100644 index 000000000000..9625230ef4f9 --- /dev/null +++ b/src/crystal/pointer_pairing_heap.cr @@ -0,0 +1,161 @@ +# :nodoc: +# +# Tree of `T` structs referenced as pointers. +# `T` must include `Crystal::PointerPairingHeap::Node`. +class Crystal::PointerPairingHeap(T) + module Node + macro included + property? heap_previous : Pointer(self)? + property? heap_next : Pointer(self)? + property? heap_child : Pointer(self)? + end + + # Compare self with other. For example: + # + # Use `<` to create a min heap. + # Use `>` to create a max heap. + abstract def heap_compare(other : Pointer(self)) : Bool + end + + @head : T* | Nil + + private def head=(head) + @head = head + head.value.heap_previous = nil if head + head + end + + def empty? + @head.nil? + end + + def first? : T* | Nil + @head + end + + def shift? : T* | Nil + if node = @head + self.head = merge_pairs(node.value.heap_child?) + node.value.heap_child = nil + node + end + end + + def add(node : T*) : Bool + if node.value.heap_previous? || node.value.heap_next? || node.value.heap_child? + raise ArgumentError.new("The node is already in a Pairing Heap tree") + end + self.head = meld(@head, node) + node == @head + end + + def delete(node : T*) : {Bool, Bool} + if node == @head + self.head = merge_pairs(node.value.heap_child?) + node.value.heap_child = nil + return {true, true} + end + + if remove?(node) + subtree = merge_pairs(node.value.heap_child?) + self.head = meld(@head, subtree) + unlink(node) + return {true, false} + end + + {false, false} + end + + private def remove?(node) + if previous_node = node.value.heap_previous? + next_sibling = node.value.heap_next? + + if previous_node.value.heap_next? == node + previous_node.value.heap_next = next_sibling + else + previous_node.value.heap_child = next_sibling + end + + if next_sibling + next_sibling.value.heap_previous = previous_node + end + + true + else + false + end + end + + def clear : Nil + if node = @head + clear_recursive(node) + @head = nil + end + end + + private def clear_recursive(node) + child = node.value.heap_child? + while child + clear_recursive(child) + child = child.value.heap_next? + end + unlink(node) + end + + private def meld(a : T*, b : T*) : T* + if a.value.heap_compare(b) + add_child(a, b) + else + add_child(b, a) + end + end + + private def meld(a : T*, b : Nil) : T* + a + end + + private def meld(a : Nil, b : T*) : T* + b + end + + private def meld(a : Nil, b : Nil) : Nil + end + + private def add_child(parent : T*, node : T*) : T* + first_child = parent.value.heap_child? + parent.value.heap_child = node + + first_child.value.heap_previous = node if first_child + node.value.heap_previous = parent + node.value.heap_next = first_child + + parent + end + + # Twopass merge of the children of *node* into pairs of two. + private def merge_pairs(a : T*) : T* | Nil + a.value.heap_previous = nil + + if b = a.value.heap_next? + a.value.heap_next = nil + b.value.heap_previous = nil + else + return a + end + + rest = merge_pairs(b.value.heap_next?) + b.value.heap_next = nil + + pair = meld(a, b) + meld(pair, rest) + end + + private def merge_pairs(node : Nil) : Nil + end + + private def unlink(node) : Nil + node.value.heap_previous = nil + node.value.heap_next = nil + node.value.heap_child = nil + end +end diff --git a/src/crystal/system/unix/epoll/event_loop.cr b/src/crystal/system/unix/epoll/event_loop.cr index dc2f2052dfa2..f638a34b2ea2 100644 --- a/src/crystal/system/unix/epoll/event_loop.cr +++ b/src/crystal/system/unix/epoll/event_loop.cr @@ -50,7 +50,7 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop system_set_timer(@timers.next_ready?) # re-add all registered fds - Evented.arena.each { |fd, index| system_add(fd, index) } + Evented.arena.each_index { |fd, index| system_add(fd, index) } end {% end %} diff --git a/src/crystal/system/unix/evented/arena.cr b/src/crystal/system/unix/evented/arena.cr index 818b80b83c41..57e408183679 100644 --- a/src/crystal/system/unix/evented/arena.cr +++ b/src/crystal/system/unix/evented/arena.cr @@ -1,14 +1,11 @@ # Generational Arena. # -# Allocates a `Slice` of `T` through `mmap`. `T` is supposed to be a struct, so -# it can be embedded right into the memory region. -# # The arena allocates objects `T` at a predefined index. The object iself is # uninitialized (outside of having its memory initialized to zero). The object -# can be allocated and later retrieved using the generation index -# (Arena::Index) that contains both the actual index (Int32) and the generation -# number (UInt32). Deallocating the object increases the generation number, -# which allows the object to be reallocated later on. Trying to retrieve the +# can be allocated and later retrieved using the generation index (Arena::Index) +# that contains both the actual index (Int32) and the generation number +# (UInt32). Deallocating the object increases the generation number, which +# allows the object to be reallocated later on. Trying to retrieve the # allocation using the generation index will fail if the generation number # changed (it's a new allocation). # @@ -21,24 +18,15 @@ # They're unique to the process and the OS always reuses the lowest fd numbers # before growing. # -# Thread safety: the memory region is pre-allocated (up to capacity) using mmap -# (virtual allocation) and pointers are never invalidated. Individual -# allocation, deallocation and regular accesses are protected by a fine grained -# lock over each object: parallel accesses to the memory region are prohibited, -# and pointers are expected to not outlive the block that yielded them (don't -# capture them). +# Thread safety: the memory region is divided in blocks of size BLOCK_BYTESIZE +# allocated in the GC. Pointers are thus never invalidated. Mutating the blocks +# is protected by a mutual exclusion lock. Individual (de)allocations of objects +# are protected with a fine grained lock. # -# Guarantees: `mmap` initializes the memory to zero, which means `T` objects are +# Guarantees: blocks' memory is initialized to zero, which means `T` objects are # initialized to zero by default, then `#free` will also clear the memory, so # the next allocation shall be initialized to zero, too. -# -# TODO: instead of the mmap that must preallocate a fixed chunk of virtual -# memory, we could allocate individual blocks of memory, then access the actual -# block at `index % size`. Pointers would still be valid (as long as the block -# isn't collected). We wouldn't have to worry about maximum capacity, we could -# still allocate blocks discontinuously & collect unused blocks during GC -# collections. -class Crystal::Evented::Arena(T) +class Crystal::Evented::Arena(T, BLOCK_BYTESIZE) INVALID_INDEX = Index.new(-1, 0) struct Index @@ -93,41 +81,12 @@ class Crystal::Evented::Arena(T) end end - @buffer : Slice(Entry(T)) - - {% unless flag?(:preview_mt) %} - # Remember the maximum allocated fd ever; - # - # This is specific to `EventLoop#after_fork` that needs to iterate the arena - # for registered fds in epoll/kqueue to re-add them to the new epoll/kqueue - # instances. Without this upper limit we'd iterate the whole arena which - # would lead the kernel to try and allocate the whole mmap in physical - # memory (instead of virtual memory) which would at best be a waste, and a - # worst fill the memory (e.g. unlimited open files). - @maximum = 0 - {% end %} - - def initialize(capacity : Int32) - pointer = self.class.mmap(LibC::SizeT.new(sizeof(Entry(T))) * capacity) - @buffer = Slice.new(pointer.as(Pointer(Entry(T))), capacity) - end - - protected def self.mmap(bytesize) - flags = LibC::MAP_PRIVATE | LibC::MAP_ANON - prot = LibC::PROT_READ | LibC::PROT_WRITE - - pointer = LibC.mmap(nil, bytesize, prot, flags, -1, 0) - System.panic("mmap", Errno.value) if pointer == LibC::MAP_FAILED - - {% if flag?(:linux) %} - LibC.madvise(pointer, bytesize, LibC::MADV_NOHUGEPAGE) - {% end %} + @blocks : Slice(Pointer(Entry(T))) + @capacity : Int32 - pointer - end - - def finalize - LibC.munmap(@buffer.to_unsafe, @buffer.bytesize) + def initialize(@capacity : Int32) + @blocks = Slice(Pointer(Entry(T))).new(1) { allocate_block } + @mutex = Thread::Mutex.new end # Allocates the object at *index* unless already allocated, then yields a @@ -140,14 +99,11 @@ class Crystal::Evented::Arena(T) # There are no generational checks. # Raises if *index* is out of bounds. def allocate_at?(index : Int32, & : (Pointer(T), Index) ->) : Index? - entry = at(index) + entry = at(index, grow: true) entry.value.@lock.sync do return if entry.value.allocated? - {% unless flag?(:preview_mt) %} - @maximum = index if index > @maximum - {% end %} entry.value.allocated = true gen_index = Index.new(index, entry.value.generation) @@ -165,9 +121,8 @@ class Crystal::Evented::Arena(T) # Yields a pointer to the object previously allocated at *index*. # - # Raises if the object isn't allocated. - # Raises if the generation has changed (i.e. the object has been freed then reallocated). - # Raises if *index* is negative. + # Raises if the object isn't allocated, the generation has changed (i.e. the + # object has been freed then reallocated) or *index* is out of bounds. def get(index : Index, &) : Nil at(index) do |entry| yield entry.value.pointer @@ -176,10 +131,9 @@ class Crystal::Evented::Arena(T) # Yields a pointer to the object previously allocated at *index* and returns # true. - # Does nothing if the object isn't allocated or the generation has changed, - # and returns false. # - # Raises if *index* is negative. + # Does nothing if the object isn't allocated, the generation has changed or + # *index* is out of bounds. def get?(index : Index, &) : Bool at?(index) do |entry| yield entry.value.pointer @@ -189,9 +143,9 @@ class Crystal::Evented::Arena(T) end # Yields the object previously allocated at *index* then releases it. - # Does nothing if the object isn't allocated or the generation has changed. # - # Raises if *index* is negative. + # Does nothing if the object isn't allocated, the generation has changed or + # *index* is out of bounds. def free(index : Index, &) : Nil at?(index) do |entry| begin @@ -203,7 +157,7 @@ class Crystal::Evented::Arena(T) end private def at(index : Index, &) : Nil - entry = at(index.index) + entry = at(index.index, grow: false) entry.value.@lock.lock unless entry.value.allocated? && entry.value.generation == index.generation @@ -229,29 +183,65 @@ class Crystal::Evented::Arena(T) end end - private def at(index : Int32) : Pointer(Entry(T)) - (@buffer + index).to_unsafe + private def at(index : Int32, grow : Bool) : Pointer(Entry(T)) + raise IndexError.new unless 0 <= index < @capacity + + n, j = index.divmod(entries_per_block) + + if n >= @blocks.size + raise RuntimeError.new("#{self.class.name}: not allocated index=#{index}") unless grow + @mutex.synchronize { unsafe_grow(n) if n >= @blocks.size } + end + + @blocks.to_unsafe[n] + j end private def at?(index : Int32) : Pointer(Entry(T))? - if 0 <= index < @buffer.size - @buffer.to_unsafe + index + return unless 0 <= index < @capacity + + n, j = index.divmod(entries_per_block) + + if block = @blocks[n]? + block + j end end - {% unless flag?(:preview_mt) %} - # Iterates all allocated objects, yields the actual index as well as the - # generation index. - def each(&) : Nil - pointer = @buffer.to_unsafe + private def unsafe_grow(n) + # we manually dup instead of using realloc to avoid parallelism issues, for + # example fork or another thread trying to iterate after realloc but before + # we got the time to set @blocks or to allocate the new blocks + new_size = n + 1 + new_pointer = GC.malloc(new_size * sizeof(Pointer(Entry(T)))).as(Pointer(Pointer(Entry(T)))) + @blocks.to_unsafe.copy_to(new_pointer, @blocks.size) + @blocks.size.upto(n) { |j| new_pointer[j] = allocate_block } + + @blocks = Slice.new(new_pointer, new_size) + end + + private def allocate_block + GC.malloc(BLOCK_BYTESIZE).as(Pointer(Entry(T))) + end - 0.upto(@maximum) do |index| - entry = pointer + index + # Iterates all allocated objects, yields the actual index as well as the + # generation index. + def each_index(&) : Nil + index = 0 + + @blocks.each do |block| + entries_per_block.times do |j| + entry = block + j if entry.value.allocated? yield index, Index.new(index, entry.value.generation) end + + index += 1 end end - {% end %} + end + + private def entries_per_block + # can't be a constant: can't access a generic when assigning a constant + BLOCK_BYTESIZE // sizeof(Entry(T)) + end end diff --git a/src/crystal/system/unix/evented/event.cr b/src/crystal/system/unix/evented/event.cr index b33130df53c2..e6937cf4d044 100644 --- a/src/crystal/system/unix/evented/event.cr +++ b/src/crystal/system/unix/evented/event.cr @@ -1,4 +1,5 @@ require "crystal/pointer_linked_list" +require "crystal/pointer_pairing_heap" # Information about the event that a `Fiber` is waiting on. # @@ -35,6 +36,9 @@ struct Crystal::Evented::Event # The event can be added to `Waiters` lists. include PointerLinkedList::Node + # The event can be added to the `Timers` list. + include PointerPairingHeap::Node + def initialize(@type : Type, @fiber, @index = nil, timeout : Time::Span? = nil) if timeout seconds, nanoseconds = System::Time.monotonic @@ -55,4 +59,8 @@ struct Crystal::Evented::Event # NOTE: musn't be changed after registering the event into `Timers`! def wake_at=(@wake_at) end + + def heap_compare(other : Pointer(self)) : Bool + wake_at < other.value.wake_at + end end diff --git a/src/crystal/system/unix/evented/event_loop.cr b/src/crystal/system/unix/evented/event_loop.cr index 65b9e746b9b2..e05025aeae98 100644 --- a/src/crystal/system/unix/evented/event_loop.cr +++ b/src/crystal/system/unix/evented/event_loop.cr @@ -38,13 +38,18 @@ module Crystal::Evented # allows optimizations to the OS (it can reuse already allocated resources), # and either the man page explicitly says so (Linux), or they don't (BSD) and # they must follow the POSIX definition. - protected class_getter arena = Arena(PollDescriptor).new(max_fds) + # + # The block size is set to 64KB because it's a multiple of: + # - 4KB (usual page size) + # - 1024 (common soft limit for open files) + # - sizeof(Arena::Entry(PollDescriptor)) + protected class_getter arena = Arena(PollDescriptor, 65536).new(max_fds) private def self.max_fds : Int32 if LibC.getrlimit(LibC::RLIMIT_NOFILE, out rlimit) == -1 raise RuntimeError.from_errno("getrlimit(RLIMIT_NOFILE)") end - rlimit.rlim_cur.clamp(..Int32::MAX).to_i32! + rlimit.rlim_max.clamp(..Int32::MAX).to_i32! end end diff --git a/src/crystal/system/unix/evented/timers.cr b/src/crystal/system/unix/evented/timers.cr index ace4fefcf09b..b30404f4427a 100644 --- a/src/crystal/system/unix/evented/timers.cr +++ b/src/crystal/system/unix/evented/timers.cr @@ -1,86 +1,49 @@ +require "crystal/pointer_pairing_heap" + # List of `Event` ordered by `Event#wake_at` ascending. Optimized for fast # dequeue and determining when is the next timer event. # -# Thread unsafe: parallel accesses much be protected. +# Thread unsafe: parallel accesses much be protected! # -# NOTE: this is a struct because it only wraps a const pointer to a deque +# NOTE: this is a struct because it only wraps a const pointer to an object # allocated in the heap. -# -# OPTIMIZE: consider a skiplist for faster lookups (add/delete). -# -# OPTIMIZE: we could avoid memmove on add/delete by allocating a buffer, putting -# entries at whatever available index in the buffer, and linking entries in -# order (using indices so we can realloc the buffer); we'd have to keep a list -# of free indexes, too. It could be a good combo of unbounded linked list while -# retaining some memory locality. It should even be compatible with a skiplist -# (e.g. make entries a fixed height tower instead of prev/next node). struct Crystal::Evented::Timers def initialize - @list = Deque(Evented::Event*).new + @heap = PointerPairingHeap(Evented::Event).new end def empty? : Bool - @list.empty? + @heap.empty? end - # Returns the time at which the next timer is supposed to run. + # Returns the time of the next ready timer (if any). def next_ready? : Time::Span? - @list.first?.try(&.value.wake_at) + @heap.first?.try(&.value.wake_at) end # Dequeues and yields each ready timer (their `#wake_at` is lower than # `System::Time.monotonic`) from the oldest to the most recent (i.e. time # ascending). def dequeue_ready(& : Evented::Event* -> Nil) : Nil - return if @list.empty? - seconds, nanoseconds = System::Time.monotonic now = Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) - n = 0 - @list.each do |event| + while event = @heap.first? break if event.value.wake_at > now + @heap.shift? yield event - n += 1 end - - # OPTIMIZE: consume the n entries at once - n.times { @list.shift } end # Add a new timer into the list. Returns true if it is the next ready timer. def add(event : Evented::Event*) : Bool - if @list.empty? - @list << event - true - elsif index = lookup(event.value.wake_at) - @list.insert(index, event) - index == 0 - else - @list.push(event) - false - end - end - - private def lookup(wake_at) - @list.each_with_index do |event, index| - return index if event.value.wake_at >= wake_at - end + @heap.add(event) end # Remove a timer from the list. Returns a tuple(dequeued, was_next_ready) of # booleans. The first bool tells whether the event was dequeued, in which case # the second one tells if it was the next ready event. def delete(event : Evented::Event*) : {Bool, Bool} - if index = @list.index(event) - @list.delete_at(index) - {true, index.zero?} - else - {false, false} - end - end - - def each(&) : Nil - @list.each { |event| yield event } + @heap.delete(event) end end diff --git a/src/crystal/system/unix/kqueue/event_loop.cr b/src/crystal/system/unix/kqueue/event_loop.cr index 6eb98a7dc948..bdf0a0bf6815 100644 --- a/src/crystal/system/unix/kqueue/event_loop.cr +++ b/src/crystal/system/unix/kqueue/event_loop.cr @@ -66,7 +66,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop system_set_timer(@timers.next_ready?) # re-add all registered fds - Evented.arena.each { |fd, index| system_add(fd, index) } + Evented.arena.each_index { |fd, index| system_add(fd, index) } end {% end %}