Skip to content

Commit

Permalink
Handle IndexError when reading msg files (#671)
Browse files Browse the repository at this point in the history
Don't close queues if only msg file size is incorrect
  • Loading branch information
viktorerlingsson authored Jun 19, 2024
1 parent 4449e11 commit 1db4c6c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- lavinmqctl didn't recognize 201/204 response codes from set_permissions, set_user_tags and add_vhost
- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669)

### Changed

Expand Down
30 changes: 30 additions & 0 deletions spec/storage_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,36 @@ describe LavinMQ::DurableQueue do
queue.message_count.should eq 2
end
end

it "handles files with few extra bytes" do
queue_name = Random::Secure.hex(10)
with_amqp_server do |s|
vhost = s.vhosts.create("test_vhost")
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue(queue_name)
queue = vhost.queues[queue_name].as(LavinMQ::DurableQueue)
mfile = queue.@msg_store.@segments.first_value

# fill up one segment
message_size = 41
while mfile.size < (LavinMQ::Config.instance.segment_size - message_size*2)
q.publish_confirm "a"
end
remaining_size = LavinMQ::Config.instance.segment_size - mfile.size - message_size
q.publish_confirm "a" * remaining_size

# publish one more message to create a new segment
q.publish_confirm "a"

# resize first segment to LavinMQ::Config.instance.segment_size
mfile.resize(LavinMQ::Config.instance.segment_size)

# read messages, should not raise any error
q.subscribe(tag: "tag", no_ack: false, &.ack)
should_eventually(be_true) { queue.empty? }
end
end
end
end

describe LavinMQ::VHost do
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ module LavinMQ
@size -= 1
notify_empty(true) if @size.zero?
return Envelope.new(sp, msg, redelivered: false)
rescue ex : IndexError
Log.warn { "Msg file size does not match expected value, moving on to next segment" }
select_next_read_segment && next
return if @size.zero?
raise Error.new(@rfile, cause: ex)
rescue ex
raise Error.new(@rfile, cause: ex)
end
Expand Down

0 comments on commit 1db4c6c

Please sign in to comment.