From 1db4c6c0d68e5bd65ab434ae038e5124db9c39c4 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 20 Jun 2024 00:44:56 +0200 Subject: [PATCH] Handle IndexError when reading msg files (#671) Don't close queues if only msg file size is incorrect --- CHANGELOG.md | 1 + spec/storage_spec.cr | 30 ++++++++++++++++++++++++++++++ src/lavinmq/queue/message_store.cr | 5 +++++ 3 files changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1e26d24cd..3fe87b0e4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - lavinmqctl didn't recognize 201/204 response codes from set_permissions, set_user_tags and add_vhost +- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669) ### Changed diff --git a/spec/storage_spec.cr b/spec/storage_spec.cr index b62bfa04cb..561adc1be2 100644 --- a/spec/storage_spec.cr +++ b/spec/storage_spec.cr @@ -98,6 +98,36 @@ describe LavinMQ::DurableQueue do queue.message_count.should eq 2 end end + + it "handles files with few extra bytes" do + queue_name = Random::Secure.hex(10) + with_amqp_server do |s| + vhost = s.vhosts.create("test_vhost") + with_channel(s, vhost: vhost.name) do |ch| + q = ch.queue(queue_name) + queue = vhost.queues[queue_name].as(LavinMQ::DurableQueue) + mfile = queue.@msg_store.@segments.first_value + + # fill up one segment + message_size = 41 + while mfile.size < (LavinMQ::Config.instance.segment_size - message_size*2) + q.publish_confirm "a" + end + remaining_size = LavinMQ::Config.instance.segment_size - mfile.size - message_size + q.publish_confirm "a" * remaining_size + + # publish one more message to create a new segment + q.publish_confirm "a" + + # resize first segment to LavinMQ::Config.instance.segment_size + mfile.resize(LavinMQ::Config.instance.segment_size) + + # read messages, should not raise any error + q.subscribe(tag: "tag", no_ack: false, &.ack) + should_eventually(be_true) { queue.empty? } + end + end + end end describe LavinMQ::VHost do diff --git a/src/lavinmq/queue/message_store.cr b/src/lavinmq/queue/message_store.cr index 974e382912..c2e42b14c8 100644 --- a/src/lavinmq/queue/message_store.cr +++ b/src/lavinmq/queue/message_store.cr @@ -129,6 +129,11 @@ module LavinMQ @size -= 1 notify_empty(true) if @size.zero? return Envelope.new(sp, msg, redelivered: false) + rescue ex : IndexError + Log.warn { "Msg file size does not match expected value, moving on to next segment" } + select_next_read_segment && next + return if @size.zero? + raise Error.new(@rfile, cause: ex) rescue ex raise Error.new(@rfile, cause: ex) end