-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mqtt-streaming: Various fixes #1595
Conversation
final case class ConsumerFree(topicName: String) extends Event(ByteString.empty) | ||
|
||
final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData) | ||
extends Event(ByteString.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These and related aren't associated with a connection id, so their connection id is an empty ByteString
. This is similar to how the server does things.
context.self ! connect | ||
disconnect(context, data.remote, data) | ||
case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId => | ||
Behaviors.same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is responsible for letting empty (thus session bound) or matching connection ids fall through to other cases.
case (_, ReceivedProducerPublishingCommand(command)) => | ||
command.runWith(Sink.foreach { | ||
case Producer.ForwardPublish(publish, packetId) => data.remote.offer(ForwardPublish(publish, packetId)) | ||
case Producer.ForwardPubRel(_, packetId) => data.remote.offer(ForwardPubRel(packetId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing over old state -- if the client ever reconnected, data.remote
would be from an old connection and thus never actually republished.
@@ -226,8 +257,13 @@ import scala.util.{Failure, Success} | |||
) | |||
) | |||
} else { | |||
data.activeProducers.values.foreach { producer => | |||
producer ! Producer.ReceiveConnect | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main change on the client to let the producers know about new ReceiveConnect
so that they can republish.
@@ -390,6 +390,9 @@ import scala.util.{Failure, Success} | |||
data.stash.foreach(context.self.tell) | |||
timer.cancel(ReceiveConnAck) | |||
|
|||
data.activeProducers.values | |||
.foreach(_ ! Producer.ReceiveConnect) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly on the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review of the connection id logic: Looking good. One comment, although more of a question than anything that should stop this from being merged.
mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/MqttSession.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re. closing over - it took me a few times, but now I get it. Really nice find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-republishing on reconnect. Really nicely done.
@ennru @2m All LGTM. Thanks @longshorej ! One thing to note is that the client connection flow's now require a connection id, in a similar way to the server connection flows. This was an oversight in our original APIs. |
Can you add a MiMa filter for everything in the |
I've not used MiMa before, and I've been looking a way for each project to be able to conveniently exclude their |
Fair enough. I'll add it later today. |
Thanks @ennru |
Now we're down to the API breaking changes we want to see
as those got a new parameter introduced. |
Yeah, that's expected as each flow needs a connection id now (similar to the server). This module is considered an "API may change" one still, right? Given that we're still going through hardening exercises that seems reasonable to me. |
The lack of a connection id was an oversight on my part initially. It is all because of this notion of a session with MQTT. A session must distinguish connections. We were doing this on the server side and exposing it in the api, but we weren’t exposing it for the client api. The client session needs to understand which connection is sending a message as there can be a race between an active one shutting down and a new one being established. Incidentally, your higher level APIs can hide this complexity given that you’re managing the connection entirely. A good candidate for a client connection id is the client port that the TCP socket has (not ther server port!). HTH. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Technically this module is not marked "API may change" so you should add a clientSessionFlow
method to Mqtt
without the connectionId
and generating one. That method should be deprecated and point to the new version.
We will be very open to do major releases once 1.0 is out when some API needs improvement that can't keep backwards-compatibilty.
This allows the MQTT session to better track which events are relevant, given that there can be races when old connections are torn down and new ones are established.
This could cause duplicate publications to never be sent to new connections
Previously, messages for QoS1/2 were only republished on an interval after not receiving an ack. It is more conventional to instead republish everything only on connect, and indeed to be compliant for MQTT 5, that is the only time this is allowed. To accomodate this, the timeouts default to 0, but the previous behavior can still be restored by changing the default producer timeout settings.
I added the requested deprecated methods, but MiMa is still complaining about something. Any advice @ennru? |
Looks like you removed the MiMa exclusions file, the version I provided is required for it to accept the other changes. |
Ah thanks, that must have happened in the rebase. I've readded it. |
Thank you @longshorej and @huntc for your continued work in this! Great to see other contributors helping out, as well. |
A number of commits here that improve the resilience of the mqtt-streaming connector. #1577 remains important as well, but will require an Akka release.
mqtt-streaming: Require a connection id to be specified for client flows
This allows the MQTT session to better track which events are relevant, given that there can be races when old connections are torn down and new ones are established.
mqtt-streaming: Fix a bug where actor state was closed over
This could cause duplicate publications to never be sent to new connections
mqtt-streaming: Republish messages on reconnect only by default
Previously, messages for QoS1/2 were only republished on an interval after not receiving an ack.
It is more conventional to instead republish everything only on connect, and indeed to be compliant for MQTT 5, that is the only time this is allowed.
To accommodate this, the timeouts default to 0, but the previous behavior can still be restored by changing the default producer timeout settings.