From 5b1b092bf769a0ba6c8ff9deb04480e60095e31b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 9 Aug 2023 21:15:05 +0200 Subject: [PATCH] StreamQueue: only unmap unused segments Previous behavior could unmap segments that other consumers were still delivering from, causing segfaults. --- src/lavinmq/queue/stream_queue.cr | 21 +++++++++++++++++++ .../queue/stream_queue_message_store.cr | 11 +++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 3cc9c865c1..bd0adb69e6 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -6,6 +6,13 @@ module LavinMQ class StreamQueue < Queue @durable = true + def initialize(@vhost : VHost, @name : String, + @exclusive = false, @auto_delete = false, + @arguments = Hash(String, AMQP::Field).new) + super + spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop" + end + def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?) super if max_age_value = Policy.merge_definitions(policy, operator_policy)["max-age"]? @@ -140,5 +147,19 @@ module LavinMQ raise LavinMQ::Error::PreconditionFailed.new("max-age must be a string") end end + + private def unmap_unused_segments_loop + until closed? + sleep 60 + unmap_unused_segments + end + end + + private def unmap_unused_segments + used_segments = @consumers_lock.synchronize { @consumers.map &.as(Client::Channel::StreamConsumer).segment } + @msg_store_lock.synchronize do + stream_queue_msg_store.unmap_segments(except: used_segments) + end + end end end diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 1975c0b5e9..5dbe61506d 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -16,6 +16,14 @@ module LavinMQ drop_overflow end + def unmap_segments(except : Enumerable(UInt32)) + @segments.each do |seg_id, mfile| + next if mfile == @wfile + next if except.includes? seg_id + mfile.unmap + end + end + private def get_last_offset : Int64 return 0i64 if @size.zero? bytesize = 0_u32 @@ -61,7 +69,6 @@ module LavinMQ {@last_offset + 1, @segments.last_key, @segments.last_value.size.to_u32} end - # ameba:disable Metrics/CyclomaticComplexity private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32) segment = @segments.first_key pos = 4u32 @@ -69,7 +76,6 @@ module LavinMQ loop do rfile = @segments[segment]? if rfile.nil? || pos == rfile.size - rfile.unmap if rfile && rfile != @wfile if segment = @segments.each_key.find { |sid| sid > segment } rfile = @segments[segment] pos = 4_u32 @@ -101,7 +107,6 @@ module LavinMQ rfile = @segments[consumer.segment]? || next_segment(consumer) || return if consumer.pos == rfile.size # EOF return if rfile == @wfile - rfile.unmap rfile = next_segment(consumer) || return end begin