You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We're using the AMQP connector and experiencing a surprising behaviour when the rabbitmq connection is restarted after being closed server side.
We have withAutomaticRecoveryEnabled and withTopologyRecoveryEnabled set to false as suggested by other issues around connection recovery.
We're explicitly acking or nacking messages and using at-least once delivery setup. We're using RestartSource.withBackoff and connection recovery happens as expected.
The issue we're facing only happens when the connection restarts while there are some messages "in-flight" in the stream, so basically unacknowledged messages from the rabbitmq perspective.
After the source is restarted as expected, we notice that the "in-flight" messages in the stream end up attempting running acks/nacks, basically generating Futures that never seem to terminate.
And given that we're using mapAsync(1) these unacknowledged messages end up blocking the stream entirely. Increasing the parallelism helps, but it obviously saturates as soon as there's an increased enough number of "in-flight" messages, so obviously doesn't feel like an appropriate solution.
We're wondering why the ack/nack Futures never terminate in the first place, maybe we're missing something obvious or misconfiguring something?
After a connection restart, we expect the ack/nack of the stream "in-flight" messages to execute and terminate, failing or succeeding, but not to run indefinitely without terminating.
The text was updated successfully, but these errors were encountered:
rymir
changed the title
AMQP in-flight stream messages never terminating ack/nack after connection recovery
AMQP in-flight stream messages never terminating ack/nack after RestartSource connection recovery
Aug 18, 2022
We're using the AMQP connector and experiencing a surprising behaviour when the rabbitmq connection is restarted after being closed server side.
We have
withAutomaticRecoveryEnabled
andwithTopologyRecoveryEnabled
set tofalse
as suggested by other issues around connection recovery.We're explicitly acking or nacking messages and using at-least once delivery setup. We're using
RestartSource.withBackoff
and connection recovery happens as expected.The issue we're facing only happens when the connection restarts while there are some messages "in-flight" in the stream, so basically unacknowledged messages from the rabbitmq perspective.
After the source is restarted as expected, we notice that the "in-flight" messages in the stream end up attempting running acks/nacks, basically generating Futures that never seem to terminate.
Our stream basically looks like the following:
source.via(flow).mapAsync(1)(finishHandling()).withAttributes(supervisionStrategy(resumingDecider))
And given that we're using
mapAsync(1)
these unacknowledged messages end up blocking the stream entirely. Increasing the parallelism helps, but it obviously saturates as soon as there's an increased enough number of "in-flight" messages, so obviously doesn't feel like an appropriate solution.We're wondering why the ack/nack Futures never terminate in the first place, maybe we're missing something obvious or misconfiguring something?
Versions used
Akka version: 2.6.14
akka-stream-alpakka-amqp version: 3.0.4
Expected Behavior
After a connection restart, we expect the ack/nack of the stream "in-flight" messages to execute and terminate, failing or succeeding, but not to run indefinitely without terminating.
The text was updated successfully, but these errors were encountered: