Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT-support #766

Open
wants to merge 202 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
202 commits
Select commit Hold shift + click to select a range
5f153c9
beginning of mqtt-poc, ping
kickster97 Aug 26, 2024
343cfb6
add send/recieve oct_count, make prometheus controller compile
kickster97 Aug 27, 2024
a690eb2
client stores client_id and quick solution for UI
kickster97 Aug 28, 2024
eadfb6a
amqp publish
kickster97 Sep 6, 2024
a778249
mqtt integration spec: ping
snichme Sep 6, 2024
403dfcf
fixup! amqp publish
kickster97 Sep 6, 2024
1c198bc
int-specs
snichme Sep 6, 2024
702d6fb
cleanup
snichme Sep 6, 2024
880d521
connect specs (pending)
snichme Sep 10, 2024
8f2d9cb
all integration specs (pending)
snichme Sep 10, 2024
e24cdf1
Synchronizer -> channel
snichme Sep 10, 2024
9f908db
spec: publish mqtt ends up in amqp queue
snichme Sep 11, 2024
d3ae0cc
send puback after publish if qos > 0
kickster97 Sep 12, 2024
1ae27bc
publish will to session WIP
kickster97 Sep 12, 2024
8a3fa34
publish returns PubAck
snichme Sep 12, 2024
86b944b
cleanup will preparation
kickster97 Sep 16, 2024
c715857
Got first working subscribe
snichme Sep 16, 2024
9140c7d
subscribe specs
snichme Sep 16, 2024
c0be2db
packet_id stuff
snichme Sep 16, 2024
fa42c19
pass more connect specs
kickster97 Sep 17, 2024
7db85e0
work on subscriber specs
kickster97 Sep 17, 2024
a92f090
start working on broker for clients
kickster97 Sep 17, 2024
2a29c97
wrap vhost in broker for mqtt::client
kickster97 Sep 23, 2024
383cee7
pass more specs and handle session_present
kickster97 Sep 23, 2024
660f2f9
all connect specs working except replace client with new connection
kickster97 Sep 23, 2024
10cb261
fix logs in client and dot expect connack in specs
kickster97 Sep 24, 2024
d4e0008
add sessions to broker, and handle incoming subscribe with session
kickster97 Sep 25, 2024
a97fd38
fixup! add sessions to broker, and handle incoming subscribe with ses…
kickster97 Sep 25, 2024
e2ce648
fixup! fixup! add sessions to broker, and handle incoming subscribe w…
kickster97 Sep 25, 2024
b1ba289
subscribe and bind a session
kickster97 Sep 27, 2024
0131348
temp
kickster97 Oct 1, 2024
39b17e5
exchange.publish
snichme Sep 18, 2024
ebcaebb
mqtt exchange
snichme Oct 1, 2024
50f0ab9
refactor mqtt session
kickster97 Oct 2, 2024
da57526
add get method to prepare for qos 1
kickster97 Oct 2, 2024
6535f15
feel like this got ugly, but subscribe specs are now passing
kickster97 Oct 3, 2024
6083866
cleanup old code and use session instead of queue
kickster97 Oct 3, 2024
afbb44a
refctoring consumer handling, and handle msg acks better
kickster97 Oct 10, 2024
b484d5d
move consumers deliver_loop to session, specs do not pass
kickster97 Oct 10, 2024
20e6061
cleanup
kickster97 Oct 10, 2024
3f775f3
SubscriptionTree
snichme Oct 10, 2024
bc9327f
rebase main
kickster97 Oct 14, 2024
88d4228
format
kickster97 Oct 14, 2024
ed84830
override method
snichme Oct 14, 2024
2edfd7d
string token specs
snichme Oct 15, 2024
aee1c13
crystal 1.14 fixes
snichme Oct 15, 2024
ffa3ea2
subscription tree specs
snichme Oct 15, 2024
0bdde3d
beginning of puback
kickster97 Oct 15, 2024
cc25b6a
subscription tree specs
snichme Oct 15, 2024
e73be60
remove puts
snichme Oct 15, 2024
1994035
Use monkey patched `IO::Buffered#peek(n)` to ensure data exists
spuun Oct 11, 2024
4521da8
It should return slice
spuun Oct 11, 2024
4149757
Add spec to verify positive size check
spuun Oct 11, 2024
a631464
Rename stuff in an attempt to make things more readable
spuun Oct 11, 2024
2ad3d7a
Refactor specs
spuun Oct 11, 2024
da18332
Cleaner specs
spuun Oct 11, 2024
b764101
Update spec description
spuun Oct 11, 2024
587052b
Fix condition when checking if enough data exists
spuun Oct 11, 2024
d6d94b7
Move data in existing buffer if needed
spuun Oct 11, 2024
8240ac2
Return what we got instead of rasing in read fails
spuun Oct 11, 2024
ad81285
Remove unused code
spuun Oct 12, 2024
49752e6
merge jons pr and fix sleep problem
kickster97 Oct 15, 2024
f3df04c
spec fix for subtree
snichme Oct 15, 2024
95ca6e2
set dup value when delivering a msg
kickster97 Oct 16, 2024
6d47ab1
cleanup
kickster97 Oct 16, 2024
2274f53
retain_store and topic_tree
kickster97 Oct 16, 2024
ad4520c
Restructure retain-store and topic_tree for better fit in LavinMQ
kickster97 Oct 17, 2024
fc2245e
publish retained message
kickster97 Oct 17, 2024
df43824
Update src/lavinmq/mqtt/retain_store.cr
kickster97 Oct 18, 2024
afd66b8
retained messages spec pass
kickster97 Oct 18, 2024
c14ba7f
Improve handling of non-clean sesssions
spuun Oct 18, 2024
90aae91
pass will specs
kickster97 Oct 21, 2024
0a3a8da
cleanup
kickster97 Oct 21, 2024
7c88076
pass duplicate message specs
kickster97 Oct 21, 2024
7081c8b
format
kickster97 Oct 21, 2024
a92ab4c
fix publish
kickster97 Oct 21, 2024
50d950b
remove obsolete header
kickster97 Oct 21, 2024
2ac47f7
cleanup
kickster97 Oct 21, 2024
e7da53a
raise io error for invalid package_id
kickster97 Oct 21, 2024
5f40c82
handle raise for double connect
kickster97 Oct 21, 2024
76fbe98
revert double connect rescue
kickster97 Oct 22, 2024
5d09957
Remove unused variable
spuun Oct 22, 2024
6f0e855
Use method overloading instead of type check
spuun Oct 22, 2024
9b2912f
Use lowercase log source
spuun Oct 22, 2024
00cc73a
Use mqtt.session as log source for Session
spuun Oct 22, 2024
4a966bd
Add spec to test publisher->subscriber flow
spuun Oct 22, 2024
45b7ec2
Add spec to verify that session is restored properly
spuun Oct 22, 2024
82cda00
Use getter instead of instance variable
spuun Oct 22, 2024
ba0e46e
beginning of max_inflight
kickster97 Oct 22, 2024
f07093e
send in topic to subscription tree, pass specs
kickster97 Oct 22, 2024
0bfd4ab
clean up
kickster97 Oct 22, 2024
ce2d7fc
create publish packet in client instead of accepting will in #broker:…
kickster97 Oct 22, 2024
8ed744e
Be consistent with typing
spuun Oct 22, 2024
b3680ea
Add routing specs
spuun Oct 22, 2024
035ee4d
Lint
spuun Oct 22, 2024
391e237
Return nil and let spec assert
spuun Oct 23, 2024
f80e0a0
Add a will spec (and some clean up)
spuun Oct 23, 2024
89579e4
publish will if PacketDecode exception
kickster97 Oct 23, 2024
9529a78
move vhost logic from client into broker
kickster97 Oct 23, 2024
6ab4c63
Improve logging
spuun Oct 23, 2024
306d131
add retain_store specs for .retain and .each
kickster97 Oct 23, 2024
a2cf40b
No need to prefix class, use namespace
spuun Oct 24, 2024
9a1f4dd
Update src/lavinmq/mqtt/client.cr
kickster97 Oct 24, 2024
713408d
remove unnessecary socket.close
kickster97 Oct 24, 2024
c2761fd
log warning instead of raise
kickster97 Oct 24, 2024
2ff71fe
fetch max_inflight_messages form config
kickster97 Oct 24, 2024
2f892a0
Convert Publish to Message in exchange
spuun Oct 24, 2024
d6b584a
Less aggressive logging
spuun Oct 24, 2024
a2123cb
Suspend fiber while waiting for msg or consumer
spuun Oct 24, 2024
8376b26
add specs for handling connect packets with empty client_ids
kickster97 Oct 25, 2024
9bdc56e
handle connect packets with empty client_id strings
kickster97 Oct 25, 2024
6ddfcf6
fixup! Suspend fiber while waiting for msg or consumer
spuun Oct 29, 2024
ff95570
Move Sessions to separate file
spuun Oct 30, 2024
d5411a4
Dont convert topic to routing key, and use topic all the way through
kickster97 Oct 30, 2024
e5381e2
prefix sessions with mqtt. and do not let amqp queues create queues t…
kickster97 Oct 30, 2024
f03af4f
move validation to queue_factory and return preconditioned fail for a…
kickster97 Oct 30, 2024
14607d2
prefix_validation wip
kickster97 Oct 31, 2024
faa111c
delete old cherry-picked code, replaced with #818
kickster97 Oct 31, 2024
115f82c
mqtt exchange receives MQTT::Publish but publish AMQP::Message to que…
snichme Oct 31, 2024
0adf3f6
remove obsolete spec
kickster97 Oct 31, 2024
76b24c6
format
kickster97 Nov 1, 2024
7e81e9a
rebase in abstrace queue
kickster97 Nov 1, 2024
954fac4
adapt for queue abstraction
kickster97 Nov 1, 2024
0392140
repain broken amqp specs
kickster97 Nov 4, 2024
918ec5c
rename prefix validator to namevalidator and move valid_entity_name i…
kickster97 Nov 4, 2024
09a9137
remove unnessecary allocation in #NameValidator.valid_prefix
kickster97 Nov 4, 2024
93ecf6d
cleanup ordering in connections js
kickster97 Nov 4, 2024
a5e1e9f
remove sessions from vhost, redundant
kickster97 Nov 4, 2024
3132431
use default random instead of secure for client_id
kickster97 Nov 4, 2024
3e1f35f
delete unreferences messages in retain store when building index
kickster97 Nov 4, 2024
a30b051
move exchange into the mqtt namespace
kickster97 Nov 4, 2024
e4b8894
use mqtt namespace MqttBindingKey->BindingKey
kickster97 Nov 4, 2024
75c6666
remove redundant return value
kickster97 Nov 4, 2024
9d148e2
update name validator
kickster97 Nov 6, 2024
1821498
move unacked_messagesapi logic to queue and overload method in session
kickster97 Nov 6, 2024
ecc4e56
format
kickster97 Nov 6, 2024
457e079
update logs for name validation failures
kickster97 Nov 6, 2024
3fdf2d4
don't have risk of overwriting retain store msg files
kickster97 Nov 7, 2024
884ec89
ensure to remove file
kickster97 Nov 7, 2024
cd0313a
format
kickster97 Nov 7, 2024
2b97145
replicate retain store, wip
kickster97 Nov 8, 2024
6990cd0
add mqtts config + listener
kickster97 Nov 11, 2024
2105965
format
kickster97 Nov 11, 2024
86908ad
replication spec works correctly
kickster97 Nov 12, 2024
b8f9b72
add mqtt_proxy for clustering client
kickster97 Nov 13, 2024
8bda86f
r+ needs file to exist before open
kickster97 Nov 13, 2024
192208a
format
kickster97 Nov 13, 2024
bcb64c5
fix ameba failures
kickster97 Nov 13, 2024
1e91cf2
satisfy ameba for spec files
kickster97 Nov 13, 2024
799d521
rename specfile with suffix
kickster97 Nov 13, 2024
c6df728
fix flaky wait_for
kickster97 Nov 13, 2024
e8613d5
Use enum for protocol to get compile time validation
spuun Nov 13, 2024
cb8f238
Fix specs to use protocol enum
spuun Nov 13, 2024
954f987
use short block notation
kickster97 Nov 13, 2024
437e3a4
fix mqtt exchange routing spec
snichme Nov 14, 2024
85e1cce
remove comment
snichme Nov 14, 2024
85a632f
scope fix
snichme Nov 14, 2024
25e9d48
Multi-vhost support
spuun Nov 14, 2024
4134084
expand details tuple for consumer UI
kickster97 Nov 14, 2024
0927305
no need to convert routing key
kickster97 Nov 14, 2024
ec39012
format
kickster97 Nov 14, 2024
a443e5c
handle unexpected close from client
kickster97 Nov 14, 2024
9da1360
connection_at for mqtt connections
snichme Nov 14, 2024
d9cdc82
deliver packet not msg from session (#843)
snichme Nov 14, 2024
188a644
truncate the previous content before you retain a message
kickster97 Nov 15, 2024
c48d5ae
safely overwrite retained messages
kickster97 Nov 15, 2024
9349685
Cant use constant as key in NamedTuple
spuun Nov 15, 2024
8d8d685
merge solutions for retain store
kickster97 Nov 15, 2024
5b0ea06
general fixup after comments
kickster97 Nov 18, 2024
a5cf929
format
kickster97 Nov 18, 2024
ed2ffa6
set flaky spec to pending
kickster97 Nov 18, 2024
406a6e1
just a test
kickster97 Nov 18, 2024
922a616
just a test
kickster97 Nov 18, 2024
a88b1cb
tmp: debug clustering_spec
baelter Nov 19, 2024
7961475
finalize clustering spec
kickster97 Nov 20, 2024
94f821b
use instance var for index file name
kickster97 Nov 21, 2024
45b5950
rescue argumenterror in deliver loop
kickster97 Dec 2, 2024
fbba191
format
kickster97 Dec 2, 2024
4801c32
Fix mqtt unix listener (and some logging)
spuun Dec 3, 2024
208c20c
Prevent Channel::Closed error from being raised
spuun Dec 3, 2024
1f17957
exception handling for mqtt default bindings
kickster97 Dec 10, 2024
96d7df0
only allow selected policies to be applied for mqtt session
kickster97 Dec 10, 2024
028b731
handle qos 2 at build_packet
kickster97 Dec 10, 2024
ae2e070
dont allow amqp queues or exchanges to bind to mqtt queues or exchanges
kickster97 Dec 11, 2024
bd562ad
forbidden to bind AMQP excahnges to the MQTT Session
kickster97 Dec 12, 2024
7795ee4
format
kickster97 Dec 12, 2024
a86e57d
clean up review comments
kickster97 Dec 13, 2024
e008a7c
initialize @broekrs in server instead of in connection_factory
kickster97 Dec 13, 2024
e82da30
version bump
viktorerlingsson Nov 25, 2024
5b8332a
Rescue IndexError in first? and move on to next segment (#864)
viktorerlingsson Dec 4, 2024
6bed3c7
java-client dont redeclare exchange in spec (#860)
viktorerlingsson Dec 5, 2024
01bdfb5
fetch max_inflight_messages form config
kickster97 Oct 24, 2024
b8701bd
clean up for review comments
kickster97 Dec 16, 2024
6cd9561
Fix some logging
spuun Dec 18, 2024
04ac017
Fixed grid error (#883)
bengtmagnus Dec 16, 2024
8a12469
log nr of streamed bytes every 30 seconds in follower (#885)
viktorerlingsson Dec 17, 2024
aa2ca46
update changelog
viktorerlingsson Dec 17, 2024
a11c9f9
Create uiux_improvements.md
bengtmagnus Dec 18, 2024
7842f82
Update uiux_improvements.md
bengtmagnus Dec 18, 2024
5273f57
Update src/lavinmq/mqtt/session.cr
kickster97 Dec 18, 2024
969b194
review fix
kickster97 Dec 18, 2024
a766b9b
remove excess write and rewind in exchange
kickster97 Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/ISSUE_TEMPLATE/uiux_improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
name: UI/UX improvement
about: Create a report to help us improve
title: ''
labels: 'UI/UX'
assignees: ''
---
**Summary**
Provide a brief description of the issue or improvement.

**Current Behavior**
Describe how the feature or UI element currently works.

**Desired Behavior**
Explain what the ideal experience should be.

**Steps to Reproduce (if reporting an issue)**
List the steps required to replicate the problem.

**Proposed Solution (for improvements)**
Suggest how to resolve the issue or implement the improvement.

**Screenshots/Mockups**
Add visuals to clarify the issue or illustrate your suggestion.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- New UI for management interface [#821](https://github.com/cloudamqp/lavinmq/pull/821)
- Use sent/received bytes instead of messages to trigger when other tasks can run [#863](https://github.com/cloudamqp/lavinmq/pull/863)

### Fixed

- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669)
- Dont redeclare exchange in java client test [#860](https://github.com/cloudamqp/lavinmq/pull/860)
- Removed duplicate metric rss_bytes [#881](https://github.com/cloudamqp/lavinmq/pull/881)
- Release leadership on graceful shutdown [#871](https://github.com/cloudamqp/lavinmq/pull/871)
- Rescue more exceptions while reading msg store on startup [#865](https://github.com/cloudamqp/lavinmq/pull/865)

### Added

- Added some logging for followers [#885](https://github.com/cloudamqp/lavinmq/pull/885)

## [2.0.2] - 2024-11-25

### Fixed

- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669)
Expand Down
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ shards:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

mqtt-protocol:
git: https://github.com/84codes/mqtt-protocol.cr.git
version: 0.2.0+git.commit.3f82ee85d029e6d0505cbe261b108e156df4e598

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr

development_dependencies:
ameba:
Expand Down
44 changes: 44 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ require "./spec_helper"
require "../src/lavinmq/clustering/client"
require "../src/lavinmq/clustering/controller"

alias IndexTree = LavinMQ::MQTT::TopicTree(String)

describe LavinMQ::Clustering::Client do
follower_data_dir = "/tmp/lavinmq-follower"

Expand Down Expand Up @@ -72,6 +74,48 @@ describe LavinMQ::Clustering::Client do
end
end

it "replicates and streams retained messages to followers" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)

spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
done.send nil
end
wait_for { replicator.followers.size == 1 }

retain_store = LavinMQ::MQTT::RetainStore.new("#{LavinMQ::Config.instance.data_dir}/retain_store", replicator)
wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }

props = LavinMQ::AMQP::Properties.new
msg1 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body1"))
msg2 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body2"))
retain_store.retain("topic1", msg1.body_io, msg1.bodysize)
retain_store.retain("topic2", msg2.body_io, msg2.bodysize)

wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }
repli.close
done.receive

follower_retain_store = LavinMQ::MQTT::RetainStore.new("#{follower_data_dir}/retain_store", LavinMQ::Clustering::NoopServer.new)
a = Array(String).new(2)
b = Array(String).new(2)
follower_retain_store.each("#") do |topic, bytes|
a << topic
b << String.new(bytes)
end

a.sort!.should eq(["topic1", "topic2"])
b.sort!.should eq(["body1", "body2"])
follower_retain_store.retained_messages.should eq(2)
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)
Expand Down
39 changes: 39 additions & 0 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,42 @@ describe LavinMQ::Exchange do
end
end
end

describe LavinMQ::MQTT::Exchange do
it "should only allow Session to bind" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::AMQP::Queue.new(vhost, "q1")
s1 = LavinMQ::MQTT::Session.new(vhost, "q1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
expect_raises(LavinMQ::Exchange::AccessRefused) do
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
end
end
end

it "publish messages to queues with it's own publish method" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
s1 = LavinMQ::MQTT::Session.new(vhost, "session 1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "mqtt.default", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
pub_args = {
packet_id: 1u16,
payload: Bytes.new(0),
dup: false,
qos: 0u8,
retain: false,
topic: "s1",
}
msg = MQTT::Protocol::Publish.new(**pub_args)
x.publish(msg)
s1.message_count.should eq 1
end
end
end
Loading