-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS SQS source: reduce consumption on empty receives #1743
AWS SQS source: reduce consumption on empty receives #1743
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throttling seems like a good idea here, but the feedback loop needs to be implemented differently.
sqs/src/main/scala/akka/stream/alpakka/sqs/ControlledThrottling.scala
Outdated
Show resolved
Hide resolved
sqs/src/main/scala/akka/stream/alpakka/sqs/ControlledThrottling.scala
Outdated
Show resolved
Hide resolved
sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala
Outdated
Show resolved
Hide resolved
…m:gipeshka/alpakka into 1740_reduce_consumption_on_empty_receives
sqs/src/main/scala/akka/stream/alpakka/sqs/impl/AutoBalancingSqsReceive.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a couple of comments.
I also think that the addition of changing parallelism to mapAsyncUnordered
is a nice generalization and might be useful to add to akka-stream itself.
|
||
private var inFlight = 0 | ||
private var buffer: BufferImpl[Out] = _ | ||
private var parallelism = initialParallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great addition to the mapAsyncUnordered
. I think we should drop initialParallelism
from the arguments of this stage, as whatever it is, it is reset after first Future completes. So we could have it hardcoded to 1 and still have the same bahaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, slow start might make sense.
After some thought I also want to change the signature of balancing function to (Out, Int) => Int, where the second argument is the current value of parallelism, wdyt? So one can increase parallelism gradually, which might be useful for something like a circuit-breaker case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
however I still need maxParallelism parameter to initialise the buffer inside the stage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Balancing function that takes previous parallelism sounds good!
however I still need maxParallelism parameter to initialise the buffer inside the stage
Ahh, thats right. What is more, the balancing function should not return larger parallelism than maxParallelism
. But since this is not user facing API, it can be left like this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
balancing function should not return larger parallelism than
maxParallelism
Yeah, I also thought about it
I'll create a PR to akka-streams later this week and will address this point there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Create an issue first, with your proposal, so any interested parties could have a discussion before the code gets written. :)
sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala
Outdated
Show resolved
Hide resolved
sqs/src/main/scala/akka/stream/alpakka/sqs/scaladsl/SqsSource.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice to discover a new generally useful operator!
Purpose
This PR is supposed to provide functionality of quick feedback to a throttling stage in SQSSource to reduce amount of requests to SQS when there is nothing to consume.
References
References #1740
Changes
Added a
BalancingMapAsyncUnordered
stage, that wraps the call to SQS. When received empty SQS response parallelism scales down to 1 until there is a non-empty response.Added specs for some basic cases and the new one.
Background Context
Inspired by https://github.com/akka/alpakka/blob/960f49a1a654b80b38ee83f2a9677785a8df17e4/sqs/src/main/scala/akka/stream/alpakka/sqs/impl/SqsSourceStage.scala