Skip to content

Commit

Permalink
message dedup for queues
Browse files Browse the repository at this point in the history
  • Loading branch information
snichme committed Nov 20, 2024
1 parent 99310c2 commit 3b7828d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
21 changes: 21 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -390,4 +390,25 @@ describe LavinMQ::AMQP::Queue do
FileUtils.rm_rf tmpdir if tmpdir
end
end

describe "deduplication" do
it "should not except message if it's a duplicate" do
with_amqp_server do |s|
with_channel(s) do |ch|
queue_name = "dedup-queue"
q1 = ch.queue(queue_name, args: AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
}))
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
ch.default_exchange.publish_confirm("body", queue_name, props: props)
ch.default_exchange.publish_confirm("body", queue_name, props: props)
q1.get(no_ack: false).not_nil!.body_io.to_s.should eq "body"
q1.get(no_ack: false).should be_nil
end
end
end
end
end
15 changes: 15 additions & 0 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ module LavinMQ::AMQP
@data_dir : String
Log = LavinMQ::Log.for "queue"
@metadata : ::Log::Metadata
@deduper : Deduplication::Deduper?

def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
Expand Down Expand Up @@ -264,6 +265,13 @@ module LavinMQ::AMQP
@single_active_consumer_queue = parse_header("x-single-active-consumer", Bool) == true
@consumer_timeout = parse_header("x-consumer-timeout", Int).try &.to_u64
validate_positive("x-consumer-timeout", @consumer_timeout)
if @arguments["x-message-deduplication"]?.try &.as?(Bool)
ttl = @arguments["x-cache-ttl"]?.try(&.as?(Int32)).try(&.to_u32)
size = @arguments["x-cache-size"]?.try(&.as?(Int32)).try(&.to_u32)
raise "Invalid x-cache-size for message deduplication" unless size
cache = Deduplication::MemoryCache(AMQ::Protocol::Field).new(size)
@deduper = Deduplication::Deduper.new(cache, ttl)
end
end

private macro parse_header(header, type)
Expand Down Expand Up @@ -414,6 +422,13 @@ module LavinMQ::AMQP

def publish(msg : Message) : Bool
return false if @deleted || @state.closed?
if d = @deduper
if d.duplicate?(msg)
# @dedup_count += 1
return false
end
d.add(msg)
end
reject_on_overflow(msg)
@msg_store_lock.synchronize do
@msg_store.push(msg)
Expand Down

0 comments on commit 3b7828d

Please sign in to comment.