-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kds stream api changes #5239
Kds stream api changes #5239
Conversation
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
import java.time.Duration; | ||
|
||
@Slf4j | ||
public class KinesisClientAPIHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to KinesisClientApiHandler
to promote consistency.
failedAttemptCount = 0; | ||
while (failedAttemptCount < maxRetryCount) { | ||
try { | ||
DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move this outside of the while
loop so that you don't have to recreate it.
try { | ||
DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder() | ||
.streamName(streamName).build(); | ||
DescribeStreamSummaryResponse response = kinesisClient.describeStreamSummary(describeStreamSummaryRequest).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this require different kinesis:
permissions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable . Yes, we will need to different kinesis:DescribeStreamSummary
permissions. This is already documented as KCL requires it.
DescribeStreamSummaryResponse response = kinesisClient.describeStreamSummary(describeStreamSummaryRequest).join(); | ||
String streamIdentifierString = getStreamIdentifierString(response.streamDescriptionSummary()); | ||
return StreamIdentifier.multiStreamInstance(streamIdentifierString); | ||
} catch (Exception ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's catch some known exceptions that the Kinesis client throws for server or client errors and log those without the stack trace.
e.g.
catch (GenericKinesisException ex) {
log.error("Failed to describe stream summary for stream {} with error {}. The kinesis source will retry.",
streamName, ex.getMessage());
} catch (GenericKinesisException ex) {
log.error("Failed to describe stream summary for stream {}. The kinesis source will retry.",
streamName, ex);
}
private void applyBackoff() { | ||
final long delayMillis = backoff.nextDelayMillis(failedAttemptCount); | ||
if (delayMillis < 0) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you interrupting the current thread?
try { | ||
Thread.sleep(delayMillis); | ||
} catch (final InterruptedException e){ | ||
log.error("Thread is interrupted while polling SQS with retry.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("Thread is interrupted while polling SQS with retry.", e); | |
log.error("Thread is interrupted while polling Kinesis with retry.", e); |
import static org.mockito.Mockito.when; | ||
|
||
public class KinesisClientAPIHandlerTest { | ||
private static final List<String> STREAMS_LIST = ImmutableList.of("stream-1", "stream-2", "stream-3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend making this a variable you initialize in @BeforeEach
. Also, you could even have @Nested
classes to test the scenario of 0 and 1 streams as well.
Also, you can use Java's List.of()
which is also immutable.
Signed-off-by: Souvik Bose <souvbose@amazon.com>
* Change API to use DescribeStreamSummary Signed-off-by: Souvik Bose <souvbose@amazon.com> * Code changes Signed-off-by: Souvik Bose <souvbose@amazon.com> * Rename the class. Signed-off-by: Souvik Bose <souvbose@amazon.com> * Address review comments Signed-off-by: Souvik Bose <souvbose@amazon.com> --------- Signed-off-by: Souvik Bose <souvbose@amazon.com> Co-authored-by: Souvik Bose <souvbose@amazon.com>
Description
This PR is to use the
DescribeStreamSummary
API to get the stream identifier instead of theDescribeStream
. It also includes handling retries using an exponential backoff strategy.Issues Resolved
Resolves #1082
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.