Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FanOutConsumerRegistration does not handle initial registration properly #506

Closed
lbourdages opened this issue Feb 19, 2019 · 1 comment
Closed
Labels
fix in progress v2.x Issues related to the 2.x version
Milestone

Comments

@lbourdages
Copy link
Contributor

When booting a consumer using enhanced fan-out for the first time, I get the following two errors repeatedly:

ERROR s.a.k.r.f.FanOutConsumerRegistration - Status of StreamConsumer <consumer_name_here>, was not ACTIVE after all retries. Was instead CREATING.
ERROR s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
java.lang.IllegalStateException: Status of StreamConsumer <consumer_name_here, was not ACTIVE after all retries. Was instead CREATING.

The number of times the errors get logged varies depending on the time it takes for the registration to become active but it is around 10 in my tests.

This occurs because of the following function in software.amazon.kinesis.retrieval.fanout.FanOutConsumerRegistration:

    private void waitForActive() throws DependencyException {
        ConsumerStatus status = null;

        int retries = maxDescribeStreamConsumerRetries;

        while (!ConsumerStatus.ACTIVE.equals(status) && retries > 0) {
            status = describeStreamConsumer().consumerDescription().consumerStatus();
            retries--;
        }

        if (!ConsumerStatus.ACTIVE.equals(status)) {
            final String message = String.format(
                    "Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.",
                    streamConsumerName, status);
            log.error(message);
            throw new IllegalStateException(message);
        }
    }

There is no sleep of any kind in the while loop. Thus, the checks will be performed as fast as possible, being only slowed down by the network latency and the speed of the kinesis API. With the default setting (10 retries), it will instantaneously throw the IllegalStateException which aborts the creation of the worker and starts it over. This should not happen as it is not an anomalous condition; an example where a similar case is handled correctly would be in software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator#initialize where the code patiently waits until the lease table is created in DynamoDB

I was able to prevent error logs by setting the maxDescribeStreamConsumerRetries value to an absurdly high number (such as 10000) - but it only works because the describeStreamConsumer calls get throttled and this induces backoff.

The fix seems fairly easy: all that's required is to add a sleep between the retries. There are really only two cases - either the consumer wasn't registered, in which case it's best to just sleep, or it is already registered and in that case the status should already be active and thus the sleep won't have any impact.

@sahilpalvia
Copy link
Contributor

Closing this issue out. The PR has been merged, and the fix will be available in the next release.

Please feel free to reopen tt if problem persists.

@sahilpalvia sahilpalvia added fix in progress v2.x Issues related to the 2.x version labels Mar 11, 2019
@sahilpalvia sahilpalvia added this to the 2.1.3 milestone Mar 11, 2019
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Mar 18, 2019
Milestone#30: https://github.com/awslabs/amazon-kinesis-client/milestone/30
* Added a message to recommend using `KinesisClientUtil` when an acquire timeout occurs in the `FanOutRecordsPublisher`.
  * PR#514: awslabs#514
* Added a sleep between retries while waiting for a newly created stream consumer to become active.
  * PR#506: awslabs#506
* Added timeouts on all futures returned from the DynamoDB and Kinesis clients.
  The timeouts can be configured by setting `LeaseManagementConfig#requestTimeout(Duration)` for DynamoDB, and `PollingConfig#kinesisRequestTimeout(Duration)` for Kinesis.
  * PR#518: awslabs#518
* Upgraded to SDK version 2.5.10.
  * PR#518: awslabs#518
* Artifacts for the Amazon Kinesis Client for Java are now signed by a new GPG key:
  pub   4096R/86368934 2019-02-14 [expires: 2020-02-14]
  uid                  Amazon Kinesis Tools <amazon-kinesis-tools@amazon.com>
sahilpalvia pushed a commit that referenced this issue Mar 18, 2019
Milestone#30: https://github.com/awslabs/amazon-kinesis-client/milestone/30
* Added a message to recommend using `KinesisClientUtil` when an acquire timeout occurs in the `FanOutRecordsPublisher`.
  * PR#514: #514
* Added a sleep between retries while waiting for a newly created stream consumer to become active.
  * PR#506: #506
* Added timeouts on all futures returned from the DynamoDB and Kinesis clients.
  The timeouts can be configured by setting `LeaseManagementConfig#requestTimeout(Duration)` for DynamoDB, and `PollingConfig#kinesisRequestTimeout(Duration)` for Kinesis.
  * PR#518: #518
* Upgraded to SDK version 2.5.10.
  * PR#518: #518
* Artifacts for the Amazon Kinesis Client for Java are now signed by a new GPG key:
  pub   4096R/86368934 2019-02-14 [expires: 2020-02-14]
  uid                  Amazon Kinesis Tools <amazon-kinesis-tools@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix in progress v2.x Issues related to the 2.x version
Projects
None yet
Development

No branches or pull requests

2 participants