Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8382] Add rate limit policy to KinesisIO.Read #9765

Merged
merged 2 commits into from
Mar 3, 2020

Conversation

jfarr
Copy link
Contributor

@jfarr jfarr commented Oct 11, 2019

This PR adds an optional rate limit policy to KinesisIO.Read that can be configured with the following methods:

withFixedDelayRateLimitPolicy()
Specifies a fixed delay rate limit policy with the default delay of 1 second.

withFixedDelayRateLimitPolicy(Duration delay)
Specifies a fixed delay rate limit policy with the given delay.

withDynamicDelayRateLimitPolicy(Supplier<Duration> delay)
Specifies a dynamic delay rate limit policy with the given function being called at each polling interval to get the next delay value.

withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory)
Specifies the RateLimitPolicyFactory for a custom rate limiter.

This provides a strategy interface for dealing with the Kinesis hard limit of 5 read requests per second per shard. The FixedDelayRateLimiter will apply a fixed delay between read calls. The default is a "no limiter" policy which preserves the behavior of the current implementation.
R: @aromanenko-dev


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@aromanenko-dev aromanenko-dev self-requested a review October 11, 2019 09:27
Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for contribution. Beam tends to minimize number of tuning knobs. So, I'm wondering if it's possible to detect such throttling behaviour and increase timeout automatically in the runtime?

@iemejia
Copy link
Member

iemejia commented Oct 11, 2019

Oups my github page hadn't updated and I had not seen Alexey was already on this. I -1 myself from the review then. Thanks for the contribution @jfarr

@jfarr
Copy link
Contributor Author

jfarr commented Oct 12, 2019

Thank you for contribution. Beam tends to minimize number of tuning knobs. So, I'm wondering if it's possible to detect such throttling behaviour and increase timeout automatically in the runtime?

Hi @aromanenko-dev, thanks for the feedback. That makes sense and I think that it would be possible but I'm concerned that it would overly complicate the implementation and the runtime behavior. Also, an algorithm that could perform well would likely only add more knobs and if those aren't exposed then they just aren't tunable. Personally I would prefer the simplicity and predictability of a simple rate limit. If preserving the default behavior unchanged isn't a concern what do you think about just setting a reasonable default (1 second is the AWS recommendation) but leaving that knob in for the rare case that users need to tune it? I doubt that the vast majority would ever need to touch it.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 14, 2019

@jfarr I guess it would not complicate things too much. My idea is the following - when we fetch new records in ShardReadersPool.readLoop() with shardRecordsIterator.readNextBatch() then it will finally end up in calling SimplifiedKinesisClient.getRecords() where we execute AmazonKinesis.getRecords() and wrap exceptions, if any. So, it looks like that we can re-throw KMSThrottlingException and catch it in ShardReadersPool.readLoop(). In case of this exception we can write warning in logs about that, wait with progressive timeout (starting from 1 second) and recall shardRecordsIterator.readNextBatch() again. In this case no additional knobs well be needed.
I'm a bit sceptical about user-defined config option in this case since we usually never know which value will be optimal and sufficient.
Wdyt?

@jfarr
Copy link
Contributor Author

jfarr commented Oct 15, 2019

@aromanenko-dev Kinesis limits us to 5 getRecords calls per second per shard. If we're the only consumer on the stream we can call getRecords every 200ms without getting throttled (in theory). If we introduce a 1 sec delay on the first KMSThrottlingException then we have an unnecessary 800ms of latency. Let's say we reduce the initial delay to 100ms. Assuming a successful call takes about 10ms, we'll hit an exception after 50ms and for the next 950ms thereafter. On the 4th retry we'll succeed but we'll be at 800ms. Reduce it to 10ms and on the 7th retry we'll succeed but we'll be at 640ms. In any case we've introduced another knob (initial backoff delay time).

So that's assuming delay time starts at zero and only increases when we get a KMSThrottlingException. Since that's 100% guaranteed to happen we could try with a non-zero initial delay instead. Maybe if we start at 200ms we won't get throttled at all and we won't overshoot. Another knob.

OK so now we've overshot. Maybe we can ease back on the delay. We can speculatively try a shorter delay time and see if we still get throttled. How often to try? How much to ease back? More knobs.

You can fiddle with these knobs and come up with something that works well when you're the only consumer, but as soon as you have 2 or more consumers you have to throw that out the window. If you're pulling back more or less records that may take more or less time and throw off your timing. You can't really hardcode anything because what works well in one scenario may not work well in another.

So those are my initial thoughts. I'm totally open to the idea that I'm just overthinking it. I think the ultimate test would be to try it out and see what works. But if you want to minimize latency (and we do) I think the 1 second rolling window introduces a feedback delay that makes this a little more complicated than first glance. What do you think?

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 16, 2019

@jfarr I agree that there is no perfect solution for that. In the same time, I don't see too much difference between hardcoded value and configured one taking into account that Kinesis is unbounded source and user pipeline has to run for a long time without interruption. So, what's happened if we set initial polling interval too small or too big? Would we need to stop a pipeline, set new value and restart it again?

Unfortunately, there is no guarantee that this value will be efficient and work fine all the way since, as you said above, other client can start consume from the same shard in parallel. So, in this case it would better to provide adaptive solution.

Even if we should expose some knobs to user, so what's about using FluentBackoff [1] instead? It can be configured with retry configuration object, like we do in SolrIO and some other IOs with RetryConfiguration.

Other option could be to allow to provide UDF object which will manage polling delay dynamically (for example, SerializableFunction).

Wdyt?

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java

@jfarr
Copy link
Contributor Author

jfarr commented Oct 17, 2019

@aromanenko-dev To me FluentBackoff seems like the wrong way to deal with rate limiting. I would rather account for the rate limit up front than try to react to it as if it were an error. I think the end result is going to be once every second you'll get a few successful getRecords calls off then spend the rest of that second being throttled. Possibly longer because as the AWS documentation states, "Too many of these exceptions can result in exponential back-offs and thereby cause significant unexpected latencies in processing." They also state that they recommend a 1 second polling interval because, "processing larger batches of data tends to be more efficient at reducing network and other downstream latencies in your application," so polling as fast as possible then getting throttled will not be as efficient according to AWS.

You have a good point about changing the polling interval while the pipeline is running. A UDF sounds like it could be a good solution for that. We do not have that use case though. We know ahead of time how many consumers our stream will have and what their polling intervals will be. That would not change unless our architecture changes.

If it's reasonable to allow configuring a backoff strategy why would it not be reasonable to allow configuring a polling interval? I thought your goal was to not introduce any new knobs. If that's the case honestly I would rather have a hard-coded 1 second polling interval.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 18, 2019

@jfarr Regarding backoff strategy - I think it's a good way in case if your backend doesn't provide any statistics in advance at which rate you have to perform next queries (we don't know about other consumers of the same shard). We know only maximum limits and the result of last kinesis.getRecords() execution. So, starting with recommended default value and use proper backoff strategy in case of failures it could be optimal solution when you need to limit rate of requests. Otherwise, there are big chances that backend resources won't be utilised efficiently. FluentBackoff is used in many Beam IOs for similar reasons.

I'd like to ask you questions about your use case. Let's suppose we can have ability set static polling interval with withPollingInterval(Duration) (as implemented in current PR). What would be default value in your case? 1 sec? What would you do in case if it won't be enough for already running pipeline? Would your need to change this value and restart pipeline manually?

Btw, default delay of 1 sec looks quite pessimistic since according to AWS doc each shard can support up to five read transactions per second. So, in case of one consumer per shard (I can guess this is a quite common case) it should be 200 ms as default value.

PS: The goal of minimizing the number of tuning knobs is to reduce the number of possible config combinations (which grows exponentially) in case if it's possible to configure this automatically [1]. If it's not possible then the better solution will be to provide flexible API, especially for unbounded sources (as Kinesis, Kafka, etc), that are being used in long-running pipelines.

[1] https://beam.apache.org/contribute/ptransform-style-guide/#configuration

@jfarr
Copy link
Contributor Author

jfarr commented Oct 20, 2019

@aromanenko-dev thanks for the link, that's really helpful. For our use case we'll probably poll once per second. That's what the SDK docs recommend, Kinesis Firehose polls on a fixed 1 second interval, Lambda triggers poll on a fixed 1 second interval, the KCL has a configurable polling interval but it defaults to 1 second, etc., so that seems pretty standard across the board. We currently have 2 Firehoses on this stream polling once per second so in theory we could go as low as 350ms or so but as you say with a static configuration we would have to stop the running pipeline if we needed to change this later. This would allow us to add one maybe two consumers before we needed to deal with that. I know that my colleague @cmachgodaddy and possibly others are working on a v2 KinesisIO that uses subscribeToShard instead of polling getRecords so hopefully we can switch to using that instead before then.

To give you some more context, we are trying to read records from Kinesis, extract a few fields, then send the record to SNS with those fields as message attributes. So far we can't even get read from Kinesis -> write to SNS to work at our scale which isn't particularly large, only thousands of records per second. We are seeing latencies grow to 15-20 mins or more under peak load. To be honest at this point I can't say for sure whether it's KinesisIO or SnsIO or some combination, but I can say that adding the polling interval solved our ReadProvisionedThroughputExceeded problem but hasn't completely solved the latency problem. What I would like to do next week is build a test pipeline that just reads from Kinesis, calculates the latency at the moment we receive the record, then pushes that metric to CloudWatch. Then I'd like to test the current implementation vs adding FluentBackoff vs adding a polling interval (my PR) vs adding both. That will allow us to isolate KinesisIO and get some hard data that we can compare. I'll let you know what I find ASAP. How does that sound?

@cmachgodaddy
Copy link
Contributor

@aromanenko-dev , Alex correct me if I am wrong. I am trying to understand the existing KinesisIO to migrate it, and I found in each split it will generate a bunch of threads, and each thread will pull records from a shard (so 1:1 , 1 thread one shard). But, the point is if we have 10 splits (or parallelisms), then there will be 10 x # threads reading a shard. In other words, there will be 10 clients reading one shard concurrently. That's why we are experiencing this throttling. I am wondering why couldn't it be designed in a way that each split read one shard (1:1) ?

@aromanenko-dev
Copy link
Contributor

@jfarr Sounds great for me! Thank you for your contribution on this! Actually, I think it's very interesting and important topic for KinesisIO and we have a field for improvement there.

@aromanenko-dev
Copy link
Contributor

@cmachgodaddy Hmm, what do you mean by "split"? One split is one shard, and as you properly said before, only one thread from thread pool will consume messages from this shard. Did I miss something?

@cmachgodaddy
Copy link
Contributor

@aromanenko-dev , a bit technical, in KinesisSource, we have spit method, this method generate a number of sources or readers (or KinesisReader) ? This split method is now based on either the desiredNumSplits (which is configured by user as a parallelism parameter) or partitions (number of shards) to generate num of readers. However, in each reader (KinesisReader), we again generate another number of threads and each read one shard. So this looks to me a parallelism inside a parallelism (or nested parallelism). Here is in the KinesisReader, which call ShareReadersPool::start(), and which uses ExecutorService to execute 'readLoop' in parallel (in diff thread) https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java#L124. Let me know if I am missing something?

@aromanenko-dev
Copy link
Contributor

@cmachgodaddy My understanding is the following. KinesisSource.split() method is called on driver, before running a pipeline. So, it will create a list of KinesisSources for every split/partition (based on initialCheckpointGenerator) that will be distributed on workers and run there. Then every KinesisSource will create KinesisReader with own CheckpointGenerator (on worker) and instantiate ShardReadersPool which will create thread for every shard iterator (since every reader can read from more than one shard). So, I don't see nested parallelism here. Though, it would be great to test it in case if we have a bug there since this logic is quite complicated.

@cmachgodaddy
Copy link
Contributor

cmachgodaddy commented Oct 21, 2019

@aromanenko-dev , I think we all understand it right, from what you described above we just say it in different ways ;-) . In simple words, each split, run on each worker, will create a number of threads base off num of shards. That's means if we have 10 splits, we will have 10 guys/reader read one shard (of course, it will read all shards)? Hope you agree with me this point? Now, what Amazon recommend is we should have one client read one shard. Here is what they say:

Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances.
And, here is why they do that, https://docs.aws.amazon.com/streams/latest/dev/kinesis-low-latency.html

But, don't argue that here they use KCL, so pls consider one KCL is one KinesisClient that our split use to connect to Kinesis and read a shard. And don't misunderstand the one worker can read a number of shard. Here in their context, or the example in their doc, their worker is an application that run on EC2 instance, and when that scale this application, it will loadbalance the KCL, e.g. two EC2 will have two KCL, each read two shards (assume a stream has 4 shards).

The point is we don't want to have number readers greater number of shards in one application (or one pipeline in our context)? Imagine, if we have 10 pipelines deployed in our runners, and which has parallelism of 10, then we will have 10 x 10 readers reading one shard ? And this 10 x 10 readers are not loadbalanced since we are not using KCL? (even with KCL, this number still exceed num of shard?). I am sure we will get "throughput exception" in the log :-)

That's why in my subscribing POC (for the enhanced-fan-out), I design it in a way that each split read only one shard (1:1).

@alexvanboxel
Copy link
Contributor

@alexvanboxel , I think we all understand it right, from what you described above we just say it in different ways ;-) . In simple words, each split, run on each worker, will create a number of threads

I think you're talking about another Alex, probably @aromanenko-dev :-)

@jfarr
Copy link
Contributor Author

jfarr commented Oct 22, 2019

@cmachgodaddy I’ll have to confirm but I’m pretty sure each ShardReadersPool doesn’t read every shard just the ones assigned to that worker. That’s pretty much what I’ve observed.

@cmachgodaddy
Copy link
Contributor

@jfarr , uhmmm , what happens when you set parallelism of 1? Do you see one worker just pull one shard (which assigned to it) ? and the rest of the shards just seating idle? does it mean we just read 1/3 of data stream, assuming you have 3 shards?

@jfarr
Copy link
Contributor Author

jfarr commented Oct 22, 2019

@jfarr , uhmmm , what happens when you set parallelism of 1? Do you see one worker just pull one shard (which assigned to it) ? and the rest of the shards just seating idle? does it mean we just read 1/3 of data stream, assuming you have 3 shards?

@cmachgodaddy I believe if you have one worker and three shards then that worker will be assigned all three shards. Also if you have one shard and three workers then you will have one worker reading the shard and two workers sitting idle. That I can confirm, I have seen it with my own eyes.

@aromanenko-dev
Copy link
Contributor

@cmachgodaddy I think @jfarr is right - in simple case all shards are discovered by DynamicCheckpointGenerator and put into the list of KinesisSources during split() executing on driver. Every KinesisSource may contain more than 1 shard (see KinesisReaderCheckpoint) in case if number of workers is less the total number of shards but it's not possible that 2 different sources had reference to the same shard. Then, all these split sources will be distributed over workers by your resource manager. So, every Source will be responsible for one or more different shards and it will spawn one thread per shard in ShardReadersPool.

@cmachgodaddy
Copy link
Contributor

Thanks @jfarr and @aromanenko-dev for sharing the info! That make sense!

@jfarr
Copy link
Contributor Author

jfarr commented Oct 24, 2019

@aromanenko-dev I'm still testing and gathering data but one of my colleagues had an idea I wanted to run by you. What if we add a polling rate policy object and have fixed and fluent versions with fluent as the default?

@aromanenko-dev
Copy link
Contributor

@jfarr Sounds good to me (it's similar to what I proposed by adding UDF object before).

@jfarr
Copy link
Contributor Author

jfarr commented Oct 24, 2019

@aromanenko-dev Great, I will start working on that too then. Thanks!

@aromanenko-dev aromanenko-dev changed the title [BEAM-8382] Add polling interval to KinesisIO.Read [WIP][BEAM-8382] Add polling interval to KinesisIO.Read Oct 26, 2019
@jfarr jfarr changed the title [WIP][BEAM-8382] Add polling interval to KinesisIO.Read [WIP][BEAM-8382] Add rate limit policy to KinesisIO.Read Nov 3, 2019
@jfarr
Copy link
Contributor Author

jfarr commented Nov 3, 2019

Hi @aromanenko-dev, I replaced the polling interval with a rate limit policy configured with the following methods:

withBackoffRateLimitPolicy(FluentBackoff fluentBackoff)
Specifies the rate limit policy as BackoffRateLimiter.

withFixedDelayRateLimitPolicy()
Specifies the rate limit policy as FixedDelayRateLimiter with the default delay of 1 second.

withFixedDelayRateLimitPolicy(Duration delay)
Specifies the rate limit policy as FixedDelayRateLimiter with the given delay.

withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory)
Specifies the RateLimitPolicyFactory for a custom rate limiter.

The default is a no limiter policy that preserves the behavior of the current implementation.

wdyt?

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this. In general, it looks fine, I just left some comments.
Also, please squash your commits into one with a proper name.

@@ -51,15 +51,13 @@

/**
* Executor service for running the threads that read records from shards handled by this pool.
* Each thread runs the {@link ShardReadersPool#readLoop(ShardRecordsIterator)} method and handles
* exactly one shard.
* Each thread runs the {@link ShardReadersPool#readLoop} method and handles exactly one shard.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to link to a method with changed signature ShardReadersPool#readLoop(ShardRecordsIterator, RateLimitPolicy)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll change that.

*/
private final ExecutorService executorService;

/**
* A Bounded buffer for read records. Records are added to this buffer within {@link
* ShardReadersPool#readLoop(ShardRecordsIterator)} method and removed in {@link
* ShardReadersPool#nextRecord()}.
* ShardReadersPool#readLoop} method and removed in {@link ShardReadersPool#nextRecord()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to link to a method with changed signature ShardReadersPool#readLoop(ShardRecordsIterator, RateLimitPolicy)?

this.backoff =
fluentBackoff
// never stop retrying
.withMaxRetries(Integer.MAX_VALUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a reason to override user's parameters and never stop backoff process? I guess that initial idea was to allow user to do that by providing custom FluentBackoff object.

*
* @param fluentBackoff The {@code FluentBackoff} used to create the backoff policy.
*/
public Read withBackoffRateLimitPolicy(FluentBackoff fluentBackoff) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add an example into KinesisIO javadoc how to use it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.beam.sdk.util.FluentBackoff is not meant to be on the public API surface and is only meant for internal use. All classes within org.apache.beam.sdk.util are like that.

If we would want to promote it to be public, we should find the appropriate place for it and move it out of util.

Copy link
Contributor

@aromanenko-dev aromanenko-dev Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's about wrapping it into KinesisIO specific backoff wrapper?
PS: It could be not evident that this is for internal use only. Does it make sense to annotate specifically utils classes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other IOs seem to take the configuration parameters explicitly and use FluentBackoff internally like ClickhouseIO and SnsIO instead of exposing a wrapped object of FluentBackoff directly.

Updating package-info.java to be clearer about this directory being an internal implementation detail makes sense.

Copy link
Contributor

@aromanenko-dev aromanenko-dev Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik In this case, I suggest to change a signature of this method to Read withBackoffRateLimitPolicy(int maxRetries, Duration maxCumulativeBackoff) and insatiate FluentBackoff internally with provided params.
PS: Updated sdk/util/package-info.java doc, #10009

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering exposing a wrapper object like SnsIO. I didn't realize FluentBackoff was internal. I think that makes a lot of sense here too then.

What does it mean to give up retrying with an unbounded source though? What should we do in that case? Throw an unchecked exception and let the pipeline crash? That's why I explicitly overrode max retries and max cumulative backoff because I didn't think that made sense but I can change that if you want.

Copy link
Contributor

@aromanenko-dev aromanenko-dev Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfarr I'd leave this responsibility for user and that's why I proposed to expose these parameters.

*
* @param delay Denotes the fixed delay duration.
*/
public Read withFixedDelayRateLimitPolicy(Duration delay) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add an example into KinesisIO javadoc how to use it.

*
* @param rateLimitPolicyFactory Custom rate limit policy factory.
*/
public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add an example into KinesisIO javadoc how to use it.

* Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for
* each shard.
*/
public interface RateLimitPolicyFactory extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add tests for different types of factory.

@jfarr
Copy link
Contributor Author

jfarr commented Feb 9, 2020

Hi @aromanenko-dev, I finally got around to testing this AWS SDK RetryPolicy-based approach but with very little success. First of all, the SDK client by default uses the following retry policy for all throttling exceptions:

  • backoff strategy = PredefinedBackoffStrategies.EqualJitterBackoffStrategy
  • base delay = 500ms
  • max backoff = 20s
  • max retry count = 3

Where EqualJitterBackoffStrategy is an exponential backoff with jitter. This strategy will ensure that the jitter will never be more than 1/2 the max delay for that retry attempt. So the initial delay will always be between 250-500ms.

I added some logging and I can see the retry policy behaving as expected. For example:

2020-02-09T21:48:00.413+00:00 com.godaddy..KinesisClientsProvider shouldRetry exception `com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000000 in stream beam-test ...`: true
2020-02-09T21:48:00.413+00:00 com.godaddy..KinesisClientsProvider delayBeforeNextRetry: 333 retriesAttempted: 0
2020-02-09T21:48:00.413+00:00 com.amazonaws.http.AmazonHttpClient Retriable error detected, will retry in 333ms, attempt number: 0

Indeed the initial backoff always seems to be within 250 to 500ms. The exponential backoff is pretty irrelevant, though, because the initial delay also seems sufficient that the first retry attempt always succeeds. I didn't see any instances of a second retry attempt in my testing.

However, the cumulative effect of all of these initial instances of ProvisionedThroughputExceededException is that approximately 30% of all getRecords() calls fail. You can see that in the attached charts where GetRecords.Success is 0.7 and ReadProvisionedThroughputExceeded is 0.3. I don't think it matters whether the retry policy is implemented in Beam or in the AWS SDK, I don't think it can be tuned to perform much better than this.

Screen Shot 2020-02-09 at 1 10 18 PM

Screen Shot 2020-02-09 at 1 10 46 PM

Here is the code I used just for reference:

    RetryCondition retryCondition = new RetryCondition() {
      RetryCondition defaultCondition = PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION;
      @Override
      public boolean shouldRetry(AmazonWebServiceRequest originalRequest, AmazonClientException exception, int retriesAttempted) {
        boolean retry = defaultCondition.shouldRetry(originalRequest, exception, retriesAttempted);
        LOG.info("shouldRetry exception `{}`: {}", exception, retry);
        return retry;
      }
    };
    BackoffStrategy backoffStrategy = new BackoffStrategy() {
      BackoffStrategy defaultStrategy = PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY;
      @Override
      public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest, AmazonClientException exception, int retriesAttempted) {
        long delay = defaultStrategy.delayBeforeNextRetry(originalRequest, exception, retriesAttempted);
        LOG.info("delayBeforeNextRetry: {} retriesAttempted: {}", delay, retriesAttempted);
        return delay;
      }
    };
    RetryPolicy retryPolicy = new RetryPolicy(retryCondition, backoffStrategy,
        PredefinedRetryPolicies.DEFAULT_MAX_ERROR_RETRY, false);

    return AmazonKinesisClientBuilder.standard()
        .withClientConfiguration(PredefinedClientConfigurations.defaultConfig()
            .withRetryPolicy(retryPolicy))
        .build();

@stale stale bot removed the stale label Feb 9, 2020
@jfarr jfarr force-pushed the kinesis-io-polling-interval branch from 4c4b5b0 to fc92bfa Compare February 10, 2020 00:10
@jfarr jfarr force-pushed the kinesis-io-polling-interval branch from 2f68f94 to 1a1b6bb Compare February 10, 2020 00:51
@aromanenko-dev
Copy link
Contributor

@jfarr Thank you very much for detailed testing and explanation. In the end, what do you think would be the best solution in this case?

@jfarr
Copy link
Contributor Author

jfarr commented Feb 16, 2020

Hi, @aromanenko-dev I have already removed the BackoffRateLimiter from this PR, and I would like to finish the unit tests and documentation for FixedDelayRateLimiter and narrow the scope of the PR back to that since it's a variant of what we're currently using in production. Is there any chance we can get that in? It would be good to know before I put in the remaining work. I think the custom rate limit policy is a nice to have so I can leave that in if you want in case anyone wants to implement their own adaptive rate limit policy in the future. It would also provide a way to change the fixed delay interval of a running pipeline since that was another use case you were interested in.

@jfarr
Copy link
Contributor Author

jfarr commented Feb 17, 2020

And actually, we could support the use case of changing the polling interval at runtime by having a version of FixedDelayRateLimiter that takes a function for the delay.

@iemejia
Copy link
Member

iemejia commented Feb 17, 2020

@jfarr just for info Alexey is OOO this week, so don't be surprised if the answer takes a bit of time.

@jfarr
Copy link
Contributor Author

jfarr commented Feb 19, 2020

No problem, @iemejia, thanks for the heads up.

@aromanenko-dev
Copy link
Contributor

@jfarr Thank you very much for update and keep working on this! I'll take a look on this today and get back to you after.

@aromanenko-dev
Copy link
Contributor

@jfarr It looks fine but, as you mentioned above, it misses some tests and documentation to be merged.

@aromanenko-dev
Copy link
Contributor

@jfarr Btw, did you do the similar tests using these changes with fixed delay as you did for PredefinedBackoffStrategies.EqualJitterBackoffStrategy above? Sorry, if I missed that.

@jfarr
Copy link
Contributor Author

jfarr commented Mar 1, 2020

Hi @aromanenko-dev, sure we have tested it. Here's what read throttles look like with a 1s fixed delay:
Screen Shot 2020-03-01 at 2 46 05 PM

@jfarr
Copy link
Contributor Author

jfarr commented Mar 1, 2020

@aromanenko-dev if I wanted to make FixedDelayRateLimiter take a function as an argument, can I just declare it as Supplier<Duration> or should I declare it some other way due to Beam's serializability requirements?

@jfarr jfarr changed the title [WIP][BEAM-8382] Add rate limit policy to KinesisIO.Read [BEAM-8382] Add rate limit policy to KinesisIO.Read Mar 2, 2020
@jfarr
Copy link
Contributor Author

jfarr commented Mar 2, 2020

Hi @aromanenko-dev as long as the Supplier<Duration> param is OK then I think this PR is ready. What do you think?

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @jfarr Thank you for your contribution and keep working on this PR after several "back and forth"s during PR's discussion. This feature should be really helpful for users.

Let's wait for tests results before merging.

@aromanenko-dev
Copy link
Contributor

retest this please

@aromanenko-dev
Copy link
Contributor

Run Java PreCommit

@jfarr
Copy link
Contributor Author

jfarr commented Mar 3, 2020

@aromanenko-dev Sure, happy to help!

@aromanenko-dev aromanenko-dev merged commit 873f689 into apache:master Mar 3, 2020
@aromanenko-dev
Copy link
Contributor

Hmm, there is some flaky test in KinesisIO caused by org.apache.beam.sdk.io.kinesis.ShardReadersPoolTest.shouldCallRateLimitPolicy introduced in this PR. Could you take a look please on this issue?

@jfarr
Copy link
Contributor Author

jfarr commented Mar 9, 2020 via email

@jfarr
Copy link
Contributor Author

jfarr commented Mar 10, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants