Skip to content

Commit

Permalink
subscribe and bind a session
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 27, 2024
1 parent 772185d commit 4e747f2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 64 deletions.
22 changes: 2 additions & 20 deletions spec/mqtt/integrations/subscribe_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module MqttSpecs
end
end

pending "should replace old subscription with new [MQTT-3.8.4-3]" do
it "should replace old subscription with new [MQTT-3.8.4-3]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand Down Expand Up @@ -124,30 +124,12 @@ module MqttSpecs
publish(io, topic: "a/b", payload: "a".to_slice, qos: 1u8)
# ... consume it...
packet = read_packet(io).as(MQTT::Protocol::Publish)
# ... and verify it be qos0 (i.e. our subscribe is correct)
# ... and verify it be qos1 (i.e. our second subscribe is correct)
packet.qos.should eq(1u8)

io.should be_drained
end
end
end
end

describe "amqp" do
pending "should create a queue and subscribe queue to amq.topic" do
with_server do |server|
with_client_io(server) do |io|
connect(io)

topic_filters = mk_topic_filters({"a/b", 0})
suback = subscribe(io, topic_filters: topic_filters)
suback.should be_a(MQTT::Protocol::SubAck)

q = server.vhosts["/"].queues["mqtt.client_id"]
binding = q.bindings.find { |a, b| a.is_a?(LavinMQ::TopicExchange) && b[0] == "a.b" }
binding.should_not be_nil
end
end
end
end
end
36 changes: 19 additions & 17 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module LavinMQ
module MQTT
struct Sessions

@queues : Hash(String, Queue)

def initialize( @vhost : VHost)
Expand All @@ -17,11 +16,10 @@ module LavinMQ
end

def declare(client_id : String, clean_session : Bool)
if session = self[client_id]?
return session
self[client_id]? || begin
@vhost.declare_queue("amq.mqtt-#{client_id}", !clean_session, clean_session, AMQP::Table.new({"x-queue-type": "mqtt"}))
self[client_id]
end
@vhost.declare_queue("amq.mqtt-#{client_id}", !clean_session, clean_session, AMQP::Table.new({"x-queue-type": "mqtt"}))
return self[client_id]
end

def delete(client_id : String)
Expand All @@ -30,15 +28,19 @@ module LavinMQ
end

class Broker

getter vhost, sessions

def initialize(@vhost : VHost)
@sessions = Sessions.new(@vhost)
@clients = Hash(String, Client).new
end

#remember to remove the old client entry form the hash if you replace a client. (maybe it already does?)
def session_present?(client_id : String, clean_session) : Bool
session = @sessions[client_id]?
return false if session.nil? || clean_session
true
end

def connect_client(socket, connection_info, user, vhost, packet)
if prev_client = @clients[packet.client_id]?
Log.trace { "Found previous client connected with client_id: #{packet.client_id}, closing" }
Expand All @@ -50,18 +52,18 @@ module LavinMQ
end

def subscribe(client, packet)
name = "amq.mqtt-#{client.client_id}"
durable = false
auto_delete = false
pp "clean_session: #{client.@clean_session}"
@sessions.declare(client.client_id, client.@clean_session)
# Handle bindings, packet.topics
session = @sessions.declare(client.client_id, client.@clean_session)
qos = Array(MQTT::SubAck::ReturnCode).new(packet.topic_filters.size)
packet.topic_filters.each do |tf|
qos << MQTT::SubAck::ReturnCode.from_int(tf.qos)
rk = topicfilter_to_routingkey(tf.topic)
session.subscribe(rk, tf.qos)
end
qos
end

def session_present?(client_id : String, clean_session) : Bool
session = @sessions[client_id]?
return false if session.nil? || clean_session
true
def topicfilter_to_routingkey(tf) : String
tf.gsub("/", ".")
end

def clear_session(client_id)
Expand Down
23 changes: 6 additions & 17 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module LavinMQ
end

private def read_loop

loop do
@log.trace { "waiting for packet" }
packet = read_and_handle_packet
Expand Down Expand Up @@ -95,7 +96,7 @@ module LavinMQ
end

def recieve_publish(packet : MQTT::Publish)
rk = topicfilter_to_routingkey(packet.topic)
rk = @broker.topicfilter_to_routingkey(packet.topic)
props = AMQ::Protocol::Properties.new(
message_id: packet.packet_id.to_s
)
Expand All @@ -112,25 +113,13 @@ module LavinMQ
end

def recieve_subscribe(packet : MQTT::Subscribe)
@broker.subscribe(self, packet)
qos = Array(MQTT::SubAck::ReturnCode).new
packet.topic_filters.each do |tf|
qos << MQTT::SubAck::ReturnCode.from_int(tf.qos)
rk = topicfilter_to_routingkey(tf.topic)
#handle bindings in broker.
@broker.vhost.bind_queue("amq.mqtt-#{client_id}", "amq.topic", rk)
end
# handle add_consumer in broker.
queue = @broker.vhost.queues["amq.mqtt-#{client_id}"]
consumer = MqttConsumer.new(self, queue)
queue.add_consumer(consumer)
qos = @broker.subscribe(self, packet)
session = @broker.sessions[@client_id]
consumer = MqttConsumer.new(self, session)
session.add_consumer(consumer)
send(MQTT::SubAck.new(qos, packet.packet_id))
end

def topicfilter_to_routingkey(tf) : String
tf.gsub("/", ".")
end

def recieve_unsubscribe(packet)
end

Expand Down
28 changes: 18 additions & 10 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
module LavinMQ
module MQTT
class Session < Queue
@clean_session : Bool = false
getter clean_session
@clean_session : Bool = false
@subscriptions : Int32 = 0
getter clean_session

def initialize(@vhost : VHost,
@name : String,
@auto_delete = false,
arguments : ::AMQ::Protocol::Table = AMQP::Table.new)
super(@vhost, @name, false, @auto_delete, arguments)
end

def clean_session?
@auto_delete
end
def clean_session?; @auto_delete; end
def durable?; !clean_session?; end

def durable?
!clean_session?
# TODO: "amq.tocpic" is hardcoded, should be the mqtt-exchange when that is finished
def subscribe(rk, qos)
arguments = AMQP::Table.new({"x-mqtt-qos": qos})
if binding = bindings.find { |b| b.binding_key.routing_key == rk }
return if binding.binding_key.arguments == arguments
@vhost.unbind_queue(@name, "amq.topic", rk, binding.binding_key.arguments || AMQP::Table.new)
end
@vhost.bind_queue(@name, "amq.topic", rk, arguments)
end

#TODO: implement subscribers array and session_present? and send instead of false
def connect(client)
client.send(MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted))
def unsubscribe(rk)
# unbind session from the exchange
# decrease @subscriptions by 1
# if subscriptions is empty, delete the session(do that from broker?)
end
end
end
Expand Down

0 comments on commit 4e747f2

Please sign in to comment.