diff --git a/src/lavinmq/queue/message_store.cr b/src/lavinmq/queue/message_store.cr index fd74c0b92b..23a93f3c8f 100644 --- a/src/lavinmq/queue/message_store.cr +++ b/src/lavinmq/queue/message_store.cr @@ -208,6 +208,14 @@ module LavinMQ (@bytesize / @size).to_u32 end + def unmap_segments(except : Enumerable(UInt32) = StaticArray(UInt32, 0).new(0u32)) + @segments.each do |seg_id, mfile| + next if mfile == @wfile + next if except.includes? seg_id + mfile.unmap + end + end + private def select_next_read_segment : MFile? # Expect @segments to be ordered if id = @segments.each_key.find { |sid| sid > @rfile_id } diff --git a/src/lavinmq/queue/queue.cr b/src/lavinmq/queue/queue.cr index 7b6fbbd380..8271bb07aa 100644 --- a/src/lavinmq/queue/queue.cr +++ b/src/lavinmq/queue/queue.cr @@ -831,8 +831,14 @@ module LavinMQ end end if @consumers.empty? - notify_consumers_empty(true) - delete if @auto_delete + if @auto_delete + delete + else + notify_consumers_empty(true) + @msg_store_lock.synchronize do + @msg_store.unmap_segments + end + end end end diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 918e554371..0226e38ced 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -16,14 +16,6 @@ module LavinMQ drop_overflow end - def unmap_segments(except : Enumerable(UInt32)) - @segments.each do |seg_id, mfile| - next if mfile == @wfile - next if except.includes? seg_id - mfile.unmap - end - end - private def get_last_offset : Int64 return 0i64 if @size.zero? bytesize = 0_u32