Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack MqttMessage in Sink Flow #1747

Merged
merged 29 commits into from
Jun 20, 2019
Merged

Ack MqttMessage in Sink Flow #1747

merged 29 commits into from
Jun 20, 2019

Conversation

nsandroni
Copy link
Contributor

@nsandroni nsandroni commented Jun 11, 2019

Pull Request Checklist

Purpose

Now you can create a flow with MqttMessageWithAck in input, and receive an ack wher the message is correctly written
For example you can now implement a resilient flow of enrichment data
MqttSource -> flow (like enrichment) -> MqttSink
and remove message from source ONLY when sink has been written

References

Changes

Added new class (MqttFlowStageWithAck) for manage MqttMessageWithAck

@ennru ennru added the p:mqtt label Jun 12, 2019
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this suggestion!

Copying the whole class to add tiny changes is not a good option here. You should make the GraphStageLogic a named class and use inheritance to diverge where needed (which is quite little AFAICS).

PS: Our CI build was a bit broken, it should work now.

@nsandroni
Copy link
Contributor Author

Ok thank you :)

Thank you for this suggestion!

Copying the whole class to add tiny changes is not a good option here. You should make the GraphStageLogic a named class and use inheritance to diverge where needed (which is quite little AFAICS).

PS: Our CI build was a bit broken, it should work now.

@ennru
Copy link
Member

ennru commented Jun 13, 2019

Looks better.

For MiMa you need to add an exclusion file similar to csv/src/main/mima-filters/1.0.x.backwards.excludes.

Copy link
Contributor Author

@nsandroni nsandroni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed flow

)
} catch {
case e: Throwable => failStageWith(e)
def publishToMqttWithAck(msg: MqttMessageWithAck): IMqttDeliveryToken = publishToMqtt(msg.message)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap if MqttMessageWithAck

queue.enqueue(message)
}
}
class MqttFlowStageLogic[I](in: Inlet[I],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class take a parameter of input

import akka.stream.alpakka.mqtt.impl.MqttFlowStageLogic._

private val backpressurePahoClient = new Semaphore(bufferSize)
private var pendingMsg = Option.empty[I]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending messages depends on parameter type

private val queue = mutable.Queue[MqttMessageWithAck]()
private val unackedMessages = new AtomicInteger()

protected def handleDeliveryComplete(token: IMqttDeliveryToken): Unit = ()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted method for easy override

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why you didn't override the MQTT callback deliveryComplete directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original method is inside the callback.
i prefer extract this method instead to override the entire callback

Copy link
Contributor Author

@nsandroni nsandroni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments on changes

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better than copying the whole thing! Take it one step further with two subclasses.

private val queue = mutable.Queue[MqttMessageWithAck]()
private val unackedMessages = new AtomicInteger()

protected def handleDeliveryComplete(token: IMqttDeliveryToken): Unit = ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why you didn't override the MQTT callback deliveryComplete directly?

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this looks good now. Remaining things:

  • I don't understand the use case of MqttSource.atLeastOnceWithAck.
  • Add the factory methods in the Java DSL
  • Documentation

@@ -64,5 +67,42 @@ class MqttFlowSpec
mqttMessagePromise.success(None)
noException should be thrownBy result.futureValue
}
"send an ack after sent confirmation" in {
val topic = "flow-spec-ack/topic"
//#create-flow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to remove these markers, they are used to pull snippets into the docs.

Copy link
Contributor Author

@nsandroni nsandroni Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this looks good now. Remaining things:

  • I don't understand the use case of MqttSource.atLeastOnceWithAck.
  • Add the factory methods in the Java DSL
  • Documentation

MqttSource.atLeastOnceWithAck it's not correct because i touched only the flow..
i remove this method in the next commit...
at the moment i'm searching the problem of the tests.. seems that sometimes ( that i don't understand yet ) the method 'delivery Complete ' is not called.
for the third tick, what documentation do you mean?

thank you for your patience in my first P.R :)

@ennru
Copy link
Member

ennru commented Jun 17, 2019

I'd like to see a section in the documentation for this use case which shows some example flow reading from one topic and writing to a new one.

@nsandroni
Copy link
Contributor Author

i added a retry on the ack check with a timeout of 15 sec ( i know, is very high), but sometimes the ack are not fired at all... not even after 15 seconds...
when the ack is called, the test end with success in few milliseconds...
under debug obviusly the problem is not shown.
i'm preparing the Java test , but the problem is the same... do you have any suggestions for troubleshooting?

@ennru
Copy link
Member

ennru commented Jun 18, 2019

I believe one problem in the MQTT tests is that they reuse the same topic. Are you aware that the topics need to be listed in mqtt/src/test/travis/acl for the Docker container to pick them up?
The code shown in the docs should be provided as @@snip from real test sources.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few spelling/wording suggestions and we are good.

docs/src/main/paradox/mqtt.md Outdated Show resolved Hide resolved
docs/src/main/paradox/mqtt.md Outdated Show resolved Hide resolved
docs/src/main/paradox/mqtt.md Outdated Show resolved Hide resolved
docs/src/main/paradox/mqtt.md Outdated Show resolved Hide resolved
nsandroni and others added 4 commits June 20, 2019 10:59
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ennru ennru added this to the 1.0.3 milestone Jun 20, 2019
@ennru ennru merged commit 872d76d into akka:master Jun 20, 2019
@ennru
Copy link
Member

ennru commented Jun 20, 2019

Thank you! And congratulations for your first contribution, keep them coming!

@nsandroni
Copy link
Contributor Author

Thank you! And congratulations for your first contribution, keep them coming!

Thanks! :)

cheleb pushed a commit to cheleb/alpakka that referenced this pull request Jul 5, 2019
* add ack after publish
* documentation for MqttFlow.atLeastOnceWithAck
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants