Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message deduplication for exchanges #853

Closed
wants to merge 175 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
175 commits
Select commit Hold shift + click to select a range
60cab17
beginning of mqtt-poc, ping
kickster97 Aug 26, 2024
3462c8e
add send/recieve oct_count, make prometheus controller compile
kickster97 Aug 27, 2024
7e20f25
client stores client_id and quick solution for UI
kickster97 Aug 28, 2024
eece84f
amqp publish
kickster97 Sep 6, 2024
20660fd
mqtt integration spec: ping
snichme Sep 6, 2024
7c99bfb
fixup! amqp publish
kickster97 Sep 6, 2024
afa1f90
int-specs
snichme Sep 6, 2024
76f7520
cleanup
snichme Sep 6, 2024
690455d
connect specs (pending)
snichme Sep 10, 2024
6b89644
all integration specs (pending)
snichme Sep 10, 2024
a2e50f1
Synchronizer -> channel
snichme Sep 10, 2024
d10dec4
spec: publish mqtt ends up in amqp queue
snichme Sep 11, 2024
f735cdb
send puback after publish if qos > 0
kickster97 Sep 12, 2024
bd2885a
publish will to session WIP
kickster97 Sep 12, 2024
84c7701
publish returns PubAck
snichme Sep 12, 2024
1326ee0
cleanup will preparation
kickster97 Sep 16, 2024
14d7d5a
Got first working subscribe
snichme Sep 16, 2024
99a52f5
subscribe specs
snichme Sep 16, 2024
bd84fc9
packet_id stuff
snichme Sep 16, 2024
6d260ee
pass more connect specs
kickster97 Sep 17, 2024
c81c15e
work on subscriber specs
kickster97 Sep 17, 2024
9deb2af
start working on broker for clients
kickster97 Sep 17, 2024
da8ef99
wrap vhost in broker for mqtt::client
kickster97 Sep 23, 2024
ef792f5
pass more specs and handle session_present
kickster97 Sep 23, 2024
f825a44
all connect specs working except replace client with new connection
kickster97 Sep 23, 2024
9ca561b
fix logs in client and dot expect connack in specs
kickster97 Sep 24, 2024
ce14c4e
add sessions to broker, and handle incoming subscribe with session
kickster97 Sep 25, 2024
cdb2b81
fixup! add sessions to broker, and handle incoming subscribe with ses…
kickster97 Sep 25, 2024
537fc53
fixup! fixup! add sessions to broker, and handle incoming subscribe w…
kickster97 Sep 25, 2024
7ef8ad8
subscribe and bind a session
kickster97 Sep 27, 2024
1ba53bd
temp
kickster97 Oct 1, 2024
32a82b2
exchange.publish
snichme Sep 18, 2024
a2cf9f1
mqtt exchange
snichme Oct 1, 2024
d01cc16
refactor mqtt session
kickster97 Oct 2, 2024
bd4179a
add get method to prepare for qos 1
kickster97 Oct 2, 2024
4a1be89
feel like this got ugly, but subscribe specs are now passing
kickster97 Oct 3, 2024
01c1fa1
cleanup old code and use session instead of queue
kickster97 Oct 3, 2024
f698fc3
refctoring consumer handling, and handle msg acks better
kickster97 Oct 10, 2024
289b7ee
move consumers deliver_loop to session, specs do not pass
kickster97 Oct 10, 2024
4109e56
cleanup
kickster97 Oct 10, 2024
1d92ceb
SubscriptionTree
snichme Oct 10, 2024
be2bf24
rebase main
kickster97 Oct 14, 2024
4405f8d
format
kickster97 Oct 14, 2024
052469b
override method
snichme Oct 14, 2024
e3f7d6c
string token specs
snichme Oct 15, 2024
de0c11a
crystal 1.14 fixes
snichme Oct 15, 2024
1ea3759
subscription tree specs
snichme Oct 15, 2024
1d196cb
beginning of puback
kickster97 Oct 15, 2024
77f79e8
subscription tree specs
snichme Oct 15, 2024
2db7b92
remove puts
snichme Oct 15, 2024
bd03f2b
Use monkey patched `IO::Buffered#peek(n)` to ensure data exists
spuun Oct 11, 2024
9fedf5e
It should return slice
spuun Oct 11, 2024
12d2c99
Add spec to verify positive size check
spuun Oct 11, 2024
831265b
Rename stuff in an attempt to make things more readable
spuun Oct 11, 2024
8685963
Refactor specs
spuun Oct 11, 2024
8959b1f
Cleaner specs
spuun Oct 11, 2024
ab38399
Update spec description
spuun Oct 11, 2024
750c325
Fix condition when checking if enough data exists
spuun Oct 11, 2024
922a32c
Move data in existing buffer if needed
spuun Oct 11, 2024
8cd324f
Return what we got instead of rasing in read fails
spuun Oct 11, 2024
598aa73
Remove unused code
spuun Oct 12, 2024
ac04cbb
merge jons pr and fix sleep problem
kickster97 Oct 15, 2024
14f9771
spec fix for subtree
snichme Oct 15, 2024
7ac2018
set dup value when delivering a msg
kickster97 Oct 16, 2024
e077058
cleanup
kickster97 Oct 16, 2024
d876c00
retain_store and topic_tree
kickster97 Oct 16, 2024
d2e0987
Restructure retain-store and topic_tree for better fit in LavinMQ
kickster97 Oct 17, 2024
6c1113d
publish retained message
kickster97 Oct 17, 2024
b0c93d5
Update src/lavinmq/mqtt/retain_store.cr
kickster97 Oct 18, 2024
652a4de
retained messages spec pass
kickster97 Oct 18, 2024
2c0c7c6
Improve handling of non-clean sesssions
spuun Oct 18, 2024
792fcb5
pass will specs
kickster97 Oct 21, 2024
e4b1f6b
cleanup
kickster97 Oct 21, 2024
c91dadc
pass duplicate message specs
kickster97 Oct 21, 2024
af6a2b3
format
kickster97 Oct 21, 2024
84d9294
fix publish
kickster97 Oct 21, 2024
5738995
remove obsolete header
kickster97 Oct 21, 2024
5ec6576
cleanup
kickster97 Oct 21, 2024
94bee1b
raise io error for invalid package_id
kickster97 Oct 21, 2024
449ac25
handle raise for double connect
kickster97 Oct 21, 2024
0ba7db3
revert double connect rescue
kickster97 Oct 22, 2024
65b8d25
Remove unused variable
spuun Oct 22, 2024
e8b8e90
Use method overloading instead of type check
spuun Oct 22, 2024
c0df423
Use lowercase log source
spuun Oct 22, 2024
12297ce
Use mqtt.session as log source for Session
spuun Oct 22, 2024
c35077e
Add spec to test publisher->subscriber flow
spuun Oct 22, 2024
51cb7c6
Add spec to verify that session is restored properly
spuun Oct 22, 2024
71425ee
Use getter instead of instance variable
spuun Oct 22, 2024
61ab96b
beginning of max_inflight
kickster97 Oct 22, 2024
8aaa93a
send in topic to subscription tree, pass specs
kickster97 Oct 22, 2024
0eb4429
clean up
kickster97 Oct 22, 2024
609e997
create publish packet in client instead of accepting will in #broker:…
kickster97 Oct 22, 2024
0cfc711
Be consistent with typing
spuun Oct 22, 2024
d9654f8
Add routing specs
spuun Oct 22, 2024
ec5147f
Lint
spuun Oct 22, 2024
ef14b1c
Return nil and let spec assert
spuun Oct 23, 2024
e59131e
Add a will spec (and some clean up)
spuun Oct 23, 2024
e4f3f3c
publish will if PacketDecode exception
kickster97 Oct 23, 2024
5742831
move vhost logic from client into broker
kickster97 Oct 23, 2024
2e8b8a4
Improve logging
spuun Oct 23, 2024
d697e5e
add retain_store specs for .retain and .each
kickster97 Oct 23, 2024
40db582
No need to prefix class, use namespace
spuun Oct 24, 2024
abedfb5
Update src/lavinmq/mqtt/client.cr
kickster97 Oct 24, 2024
56de5f2
remove unnessecary socket.close
kickster97 Oct 24, 2024
956a40f
log warning instead of raise
kickster97 Oct 24, 2024
0ab3b28
fetch max_inflight_messages form config
kickster97 Oct 24, 2024
32f04ae
Convert Publish to Message in exchange
spuun Oct 24, 2024
52ff9de
Less aggressive logging
spuun Oct 24, 2024
2d34399
Suspend fiber while waiting for msg or consumer
spuun Oct 24, 2024
5c4bd06
add specs for handling connect packets with empty client_ids
kickster97 Oct 25, 2024
8a77827
handle connect packets with empty client_id strings
kickster97 Oct 25, 2024
e824c61
fixup! Suspend fiber while waiting for msg or consumer
spuun Oct 29, 2024
c834d87
Move Sessions to separate file
spuun Oct 30, 2024
3f8f8b5
Dont convert topic to routing key, and use topic all the way through
kickster97 Oct 30, 2024
7d7c245
prefix sessions with mqtt. and do not let amqp queues create queues t…
kickster97 Oct 30, 2024
5930625
move validation to queue_factory and return preconditioned fail for a…
kickster97 Oct 30, 2024
de3a6c9
prefix_validation wip
kickster97 Oct 31, 2024
3a9f774
delete old cherry-picked code, replaced with #818
kickster97 Oct 31, 2024
64e9c35
mqtt exchange receives MQTT::Publish but publish AMQP::Message to que…
snichme Oct 31, 2024
44dd545
remove obsolete spec
kickster97 Oct 31, 2024
7e0392f
format
kickster97 Nov 1, 2024
25d8e9a
rebase in abstrace queue
kickster97 Nov 1, 2024
36f78d1
adapt for queue abstraction
kickster97 Nov 1, 2024
580b510
repain broken amqp specs
kickster97 Nov 4, 2024
d9b49d9
rename prefix validator to namevalidator and move valid_entity_name i…
kickster97 Nov 4, 2024
4768353
remove unnessecary allocation in #NameValidator.valid_prefix
kickster97 Nov 4, 2024
da37d78
cleanup ordering in connections js
kickster97 Nov 4, 2024
ef48059
remove sessions from vhost, redundant
kickster97 Nov 4, 2024
9a8a154
use default random instead of secure for client_id
kickster97 Nov 4, 2024
9f42ddc
delete unreferences messages in retain store when building index
kickster97 Nov 4, 2024
45cd64f
move exchange into the mqtt namespace
kickster97 Nov 4, 2024
969a408
use mqtt namespace MqttBindingKey->BindingKey
kickster97 Nov 4, 2024
b5c8804
remove redundant return value
kickster97 Nov 4, 2024
121d50c
update name validator
kickster97 Nov 6, 2024
b73353c
move unacked_messagesapi logic to queue and overload method in session
kickster97 Nov 6, 2024
735512d
format
kickster97 Nov 6, 2024
df8291d
update logs for name validation failures
kickster97 Nov 6, 2024
805f417
don't have risk of overwriting retain store msg files
kickster97 Nov 7, 2024
4914577
ensure to remove file
kickster97 Nov 7, 2024
0cee5e3
format
kickster97 Nov 7, 2024
7ce9348
replicate retain store, wip
kickster97 Nov 8, 2024
346950a
add mqtts config + listener
kickster97 Nov 11, 2024
c97832d
format
kickster97 Nov 11, 2024
c710523
replication spec works correctly
kickster97 Nov 12, 2024
715d8f5
add mqtt_proxy for clustering client
kickster97 Nov 13, 2024
391cdb3
r+ needs file to exist before open
kickster97 Nov 13, 2024
b266f13
format
kickster97 Nov 13, 2024
b141229
fix ameba failures
kickster97 Nov 13, 2024
7c66aed
satisfy ameba for spec files
kickster97 Nov 13, 2024
5348b77
rename specfile with suffix
kickster97 Nov 13, 2024
aefc93b
fix flaky wait_for
kickster97 Nov 13, 2024
4341545
Use enum for protocol to get compile time validation
spuun Nov 13, 2024
d1cb30e
Fix specs to use protocol enum
spuun Nov 13, 2024
13ae093
use short block notation
kickster97 Nov 13, 2024
f01d21e
fix mqtt exchange routing spec
snichme Nov 14, 2024
a21fa05
remove comment
snichme Nov 14, 2024
6ef6527
scope fix
snichme Nov 14, 2024
571b7fb
Multi-vhost support
spuun Nov 14, 2024
7f06362
expand details tuple for consumer UI
kickster97 Nov 14, 2024
0a73f6e
no need to convert routing key
kickster97 Nov 14, 2024
e32c154
format
kickster97 Nov 14, 2024
37d13a8
handle unexpected close from client
kickster97 Nov 14, 2024
bdc39bc
connection_at for mqtt connections
snichme Nov 14, 2024
d256cd1
deliver packet not msg from session (#843)
snichme Nov 14, 2024
d82a762
truncate the previous content before you retain a message
kickster97 Nov 15, 2024
8d47862
safely overwrite retained messages
kickster97 Nov 15, 2024
4189531
Cant use constant as key in NamedTuple
spuun Nov 15, 2024
5e7cc4b
merge solutions for retain store
kickster97 Nov 15, 2024
a3202d1
general fixup after comments
kickster97 Nov 18, 2024
43ec295
format
kickster97 Nov 18, 2024
d358a60
set flaky spec to pending
kickster97 Nov 18, 2024
303b81e
just a test
kickster97 Nov 18, 2024
393a53b
just a test
kickster97 Nov 18, 2024
3f0c1ab
tmp: debug clustering_spec
baelter Nov 19, 2024
72e00f7
message dedup for exchange
snichme Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ shards:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

mqtt-protocol:
git: https://github.com/84codes/mqtt-protocol.cr.git
version: 0.2.0+git.commit.3f82ee85d029e6d0505cbe261b108e156df4e598

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr

development_dependencies:
ameba:
Expand Down
42 changes: 42 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ require "./spec_helper"
require "../src/lavinmq/clustering/client"
require "../src/lavinmq/clustering/controller"

alias IndexTree = LavinMQ::MQTT::TopicTree(String)

describe LavinMQ::Clustering::Client do
follower_data_dir = "/tmp/lavinmq-follower"

Expand Down Expand Up @@ -72,6 +74,46 @@ describe LavinMQ::Clustering::Client do
end
end

it "replicates and streams retained messages to followers" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)

spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
done.send nil
end
wait_for { replicator.followers.size == 1 }

retain_store = LavinMQ::MQTT::RetainStore.new("#{LavinMQ::Config.instance.data_dir}/retain_store", replicator)
props = LavinMQ::AMQP::Properties.new
msg1 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body1"))
msg2 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body2"))
retain_store.retain("topic1", msg1.body_io, msg1.bodysize)
retain_store.retain("topic2", msg2.body_io, msg2.bodysize)

wait_for(10.seconds) { replicator.followers.first?.try &.lag_in_bytes == 0 }
repli.close
done.receive

follower_retain_store = LavinMQ::MQTT::RetainStore.new("#{follower_data_dir}/retain_store", LavinMQ::Clustering::NoopServer.new)
a = Array(String).new(2)
b = Array(String).new(2)
follower_retain_store.each("#") do |topic, bytes|
a << topic
b << String.new(bytes)
end

a.sort!.should eq(["topic1", "topic2"])
b.sort!.should eq(["body1", "body2"])
follower_retain_store.retained_messages.should eq(2)
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)
Expand Down
138 changes: 138 additions & 0 deletions spec/deduper_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
require "./spec_helper"
require "../src/lavinmq/exchange/dedup_ext.cr"

describe LavinMQ::Deduplication do
describe LavinMQ::Deduplication::MemoryCache do
it "should have a max size" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(2)
cache.insert("item1")
cache.insert("item2")
cache.insert("item3")
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end

it "should store item without ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(10)
cache.insert("item1")
cache.contains?("item1").should be_true
cache.contains?("item2").should be_false
end

it "should respect ttl" do
cache = LavinMQ::Deduplication::MemoryCache(String).new(3)
cache.insert("item1", 1)
cache.insert("item2", 300)
cache.insert("item3")
sleep 0.2.seconds
cache.contains?("item1").should be_false
cache.contains?("item2").should be_true
cache.contains?("item3").should be_true
end
end
end

class MockCache(T) < LavinMQ::Deduplication::Cache(T)
@counter = Hash(String, Array({T, UInt32?})).new do |h, k|
h[k] = Array({T, UInt32?}).new
end

def contains?(key : T) : Bool
@counter["contains?"] << {key, nil}
false
end

def insert(key : T, ttl : UInt32? = nil)
@counter["insert"] << {key, ttl}
end

def calls(key : String) : Array({T, UInt32?})
@counter[key]
end
end

describe LavinMQ::Deduplication::Deduper do
describe "duplicate?" do
it "should return false if \"x-deduplication-header\" is missing (no identifier, always unique)" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new
msg = LavinMQ::Message.new("ex", "rk", "body", props)
res = deduper.duplicate?(msg)
res.should be_false
end

it "should check cache if entry exists" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.duplicate?(msg)
mock.calls("contains?").size.should eq 1
end

it "should only insert into cache if header has a value" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new)
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 0
end

it "should only insert into cache if header has a value" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
mock.calls("insert").size.should eq 1
end

it "should respect x-cache-ttl on message" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end

it "should fallback to default ttl" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 12
end

it "should prio message ttl over default ttl" do
mock = MockCache(AMQ::Protocol::Field).new
deduper = LavinMQ::Deduplication::Deduper.new(mock, 12)
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
"x-cache-ttl" => 10,
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
deduper.add(msg)
calls = mock.calls("insert")
calls.first[0].should eq "msg1"
calls.first[1].should eq 10
end
end
end
31 changes: 31 additions & 0 deletions spec/exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,35 @@ describe LavinMQ::Exchange do
end
end
end
describe "message deduplication" do
args = AMQP::Client::Arguments.new({
"x-message-deduplication" => true,
"x-cache-size" => 10,
})

it "should handle message deduplication" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.exchange("test", "topic", args: args)
ch.queue.bind("test", "#")
ex = s.vhosts["/"].exchanges["test"]
q = s.vhosts["/"].queues.first_value
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 1
ex.dedup_count.should eq 0
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
"x-deduplication-header" => "msg1",
}))
msg = LavinMQ::Message.new("ex", "rk", "body", props)
ex.publish(msg, false).should eq 0
ex.dedup_count.should eq 1

q.message_count.should eq 1
end
end
end
end
end
39 changes: 39 additions & 0 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,42 @@ describe LavinMQ::Exchange do
end
end
end

describe LavinMQ::MQTT::Exchange do
it "should only allow Session to bind" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::AMQP::Queue.new(vhost, "q1")
s1 = LavinMQ::MQTT::Session.new(vhost, "q1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
expect_raises(LavinMQ::Exchange::AccessRefused) do
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
end
end
end

it "publish messages to queues with it's own publish method" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
s1 = LavinMQ::MQTT::Session.new(vhost, "session 1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "mqtt.default", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
pub_args = {
packet_id: 1u16,
payload: Bytes.new(0),
dup: false,
qos: 0u8,
retain: false,
topic: "s1",
}
msg = MQTT::Protocol::Publish.new(**pub_args)
x.publish(msg)
s1.message_count.should eq 1
end
end
end
Loading
Loading