-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ack MqttMessage in Sink Flow #1747
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this suggestion!
Copying the whole class to add tiny changes is not a good option here. You should make the GraphStageLogic
a named class and use inheritance to diverge where needed (which is quite little AFAICS).
PS: Our CI build was a bit broken, it should work now.
Ok thank you :)
|
Looks better. For MiMa you need to add an exclusion file similar to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed flow
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStage.scala
Outdated
Show resolved
Hide resolved
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStage.scala
Outdated
Show resolved
Hide resolved
) | ||
} catch { | ||
case e: Throwable => failStageWith(e) | ||
def publishToMqttWithAck(msg: MqttMessageWithAck): IMqttDeliveryToken = publishToMqtt(msg.message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap if MqttMessageWithAck
queue.enqueue(message) | ||
} | ||
} | ||
class MqttFlowStageLogic[I](in: Inlet[I], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class take a parameter of input
import akka.stream.alpakka.mqtt.impl.MqttFlowStageLogic._ | ||
|
||
private val backpressurePahoClient = new Semaphore(bufferSize) | ||
private var pendingMsg = Option.empty[I] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pending messages depends on parameter type
private val queue = mutable.Queue[MqttMessageWithAck]() | ||
private val unackedMessages = new AtomicInteger() | ||
|
||
protected def handleDeliveryComplete(token: IMqttDeliveryToken): Unit = () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted method for easy override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why you didn't override the MQTT callback deliveryComplete
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original method is inside the callback.
i prefer extract this method instead to override the entire callback
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStageWithAck.scala
Show resolved
Hide resolved
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStageWithAck.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments on changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better than copying the whole thing! Take it one step further with two subclasses.
private val queue = mutable.Queue[MqttMessageWithAck]() | ||
private val unackedMessages = new AtomicInteger() | ||
|
||
protected def handleDeliveryComplete(token: IMqttDeliveryToken): Unit = () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why you didn't override the MQTT callback deliveryComplete
directly?
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStage.scala
Outdated
Show resolved
Hide resolved
mqtt/src/main/scala/akka/stream/alpakka/mqtt/impl/MqttFlowStage.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this looks good now. Remaining things:
- I don't understand the use case of
MqttSource.atLeastOnceWithAck
. - Add the factory methods in the Java DSL
- Documentation
@@ -64,5 +67,42 @@ class MqttFlowSpec | |||
mqttMessagePromise.success(None) | |||
noException should be thrownBy result.futureValue | |||
} | |||
"send an ack after sent confirmation" in { | |||
val topic = "flow-spec-ack/topic" | |||
//#create-flow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to remove these markers, they are used to pull snippets into the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this looks good now. Remaining things:
- I don't understand the use case of
MqttSource.atLeastOnceWithAck
.- Add the factory methods in the Java DSL
- Documentation
MqttSource.atLeastOnceWithAck it's not correct because i touched only the flow..
i remove this method in the next commit...
at the moment i'm searching the problem of the tests.. seems that sometimes ( that i don't understand yet ) the method 'delivery Complete ' is not called.
for the third tick, what documentation do you mean?
thank you for your patience in my first P.R :)
I'd like to see a section in the documentation for this use case which shows some example flow reading from one topic and writing to a new one. |
i added a retry on the ack check with a timeout of 15 sec ( i know, is very high), but sometimes the ack are not fired at all... not even after 15 seconds... |
I believe one problem in the MQTT tests is that they reuse the same topic. Are you aware that the topics need to be listed in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few spelling/wording suggestions and we are good.
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you! And congratulations for your first contribution, keep them coming! |
Thanks! :) |
* add ack after publish * documentation for MqttFlow.atLeastOnceWithAck
Pull Request Checklist
Purpose
Now you can create a flow with MqttMessageWithAck in input, and receive an ack wher the message is correctly written
For example you can now implement a resilient flow of enrichment data
MqttSource -> flow (like enrichment) -> MqttSink
and remove message from source ONLY when sink has been written
References
Changes
Added new class (MqttFlowStageWithAck) for manage MqttMessageWithAck