Skip to content

Commit

Permalink
Rescue IndexError in first? and move on to next segment (#864)
Browse files Browse the repository at this point in the history
If a msg file is a few bytes larger than expected, first? will throw an IndexError and close the queue. This changes that behavior to instead rescue that IndexError and try to move on to the next segment file. Same fix as for shift? in #671

fixes #669 (again 🙂)
  • Loading branch information
viktorerlingsson authored and kickster97 committed Dec 16, 2024
1 parent 5f07da3 commit fbff0b0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669)

## [2.0.2] - 2024-11-25

### Fixed
Expand Down
37 changes: 36 additions & 1 deletion spec/storage_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ describe LavinMQ::AMQP::DurableQueue do
end
end

it "handles files with few extra bytes" do
it "shift? handles files with few extra bytes" do
queue_name = Random::Secure.hex(10)
with_amqp_server do |s|
vhost = s.vhosts.create("test_vhost")
Expand Down Expand Up @@ -128,6 +128,41 @@ describe LavinMQ::AMQP::DurableQueue do
end
end
end

it "first? handles files with few extra bytes" do
queue_name = Random::Secure.hex(10)
with_amqp_server do |s|
vhost = s.vhosts.create("test_vhost")
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue(queue_name)
queue = vhost.queues[queue_name].as(LavinMQ::AMQP::DurableQueue)
mfile = queue.@msg_store.@segments.first_value

# fill up one segment
message_size = 41
while mfile.size < (LavinMQ::Config.instance.segment_size - message_size*2)
q.publish_confirm "a"
end
remaining_size = LavinMQ::Config.instance.segment_size - mfile.size - message_size
q.publish_confirm "a" * remaining_size

# publish one more message to create a new segment
q.publish_confirm "a"

# resize first segment to LavinMQ::Config.instance.segment_size
mfile.resize(LavinMQ::Config.instance.segment_size)

store = LavinMQ::Queue::MessageStore.new(queue.@msg_store.@queue_data_dir, nil)
mfile = store.@segments.first_value
mfile.pos = mfile.size - 2
if msg = store.first?
msg.@segment_position.@segment.should eq 2
else
fail "no message"
end
end
end
end
end

describe LavinMQ::VHost do
Expand Down
7 changes: 6 additions & 1 deletion src/lavinmq/amqp/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module LavinMQ
notify_empty(false) if was_empty
end

def first? : Envelope?
def first? : Envelope? # ameba:disable Metrics/CyclomaticComplexity
raise ClosedError.new if @closed
if sp = @requeued.first?
seg = @segments[sp.segment]
Expand Down Expand Up @@ -90,6 +90,11 @@ module LavinMQ
msg = BytesMessage.from_bytes(rfile.to_slice + pos)
sp = SegmentPosition.make(seg, pos, msg)
return Envelope.new(sp, msg, redelivered: false)
rescue ex : IndexError
@log.warn { "Msg file size does not match expected value, moving on to next segment" }
select_next_read_segment && next
return if @size.zero?
raise Error.new(@rfile, cause: ex)
rescue ex
raise Error.new(@rfile, cause: ex)
end
Expand Down

0 comments on commit fbff0b0

Please sign in to comment.