-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding rid to SubscribeToShard response for debugging. #678
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to cache this rid in the FanoutRecordPublisher against the current subscription id and log this in two places 1. along with the infamous "Cancelling subscription and restarting.." logs in shardConsumer 2. in any of exceptionOccured()'s WARN log in FanoutRecordPublisher.
@@ -23,7 +25,11 @@ | |||
/** | |||
* Provides a record publisher that will retrieve records from Kinesis for processing | |||
*/ | |||
public interface RecordsPublisher extends Publisher<RecordsRetrieved> { | |||
public abstract class RecordsPublisher implements Publisher<RecordsRetrieved> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets avoid using abstract classes - applicable to all places. Keep it as an interface and introduce getLastSuccessfulRequestDetails()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
public abstract class RecordsPublisher implements Publisher<RecordsRetrieved> { | ||
|
||
@Getter @Setter | ||
private String lastRequestId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with the requestId, let's have the time it was dispatched and an optional field to capture other meta info. LastRequestDetails getLastSuccessfulRequestDetails()
@@ -351,7 +351,7 @@ private void resetRecordsDeliveryStateOnSubscriptionOnInit() { | |||
// Clear any lingering records in the queue. | |||
if (!recordsDeliveryQueue.isEmpty()) { | |||
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" | |||
+ "previous subscription - {} ", shardId, subscribeToShardId); | |||
+ "previous subscription - {}. rid {}", shardId, subscribeToShardId, getLastRequestId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : Make it more readable like. RequestId - { }
. Applicable to all places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that's more readable. Will update logs.
parent.shardId, connectionStartedAt, subscribeToShardId); | ||
parent.setLastRequestId(response.responseMetadata().requestId()); | ||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. rid: {}, erid: {}, sdkfields: {}", | ||
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), response.sdkFields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove sdkfields from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
@@ -130,7 +130,7 @@ private void restartIfRequestTimerExpired(long maxTimeBetweenRequests) { | |||
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { | |||
log.error( | |||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only instance of "Last request of dispatched" log in ShardConsumerSubscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
@@ -130,7 +130,7 @@ private void restartIfRequestTimerExpired(long maxTimeBetweenRequests) { | |||
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) { | |||
log.error( | |||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", | |||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse); | |||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastRequestId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, PrefetchRecordPublisher while calling this will have null returned. Either let's populate this in PrefetchRecPub or make sure null is handled through optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will update
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", | ||
shardId, recordsDeliveryQueue.remainingCapacity()); | ||
|
||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's log the RequestDetails object directly using its toString() method, so adding/removing attributes in the future won't require updates to all log statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Applicable to all places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using toString now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.", | ||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse); | ||
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful response: RequestId - {}, Timestamp - {}", | ||
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulResponseRequestId(), recordsPublisher.getLastSuccessfulResponseTimestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's directly print the requestdetails obj in the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
default String getLastSuccessfulResponseRequestId() { | ||
return getLastSuccessfulResponseDetails().map(RequestDetails::requestId).orElse(NONE); | ||
} | ||
|
||
/** | ||
* Gets last successful response's timestamp. | ||
* | ||
* @return timestamp associated with last successful response. | ||
*/ | ||
default String getLastSuccessfulResponseTimestamp() { | ||
return getLastSuccessfulResponseDetails().map(RequestDetails::timestamp).orElse(NONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this to RequestDetails if these methods are required. Ideally we should be requiring only toString() of RequestDetails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that makes more sense. Updated.
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", | ||
shardId, recordsDeliveryQueue.remainingCapacity()); | ||
|
||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful response: RequestId - {}, Timestamp - {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Applicable to all places
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received", | ||
parent.shardId, connectionStartedAt, subscribeToShardId); | ||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. Last successful response: RequestId - {}, Timestamp - {}, ExtendedLast successful response: RequestId - {}, Timestamp - {}", | ||
parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), response.responseMetadata().extendedRequestId(), connectionStartedAt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this log is messed up : ). Can you take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is :) Updated
subscriptionShutdownEvent.getShutdownEventThrowableOptional()); | ||
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful response: RequestId - {}, Timestamp - {}", | ||
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), | ||
subscriptionShutdownEvent.getShutdownEventThrowableOptional(), parent.getLastSuccessfulResponseRequestId(), parent.getLastSuccessfulResponseTimestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expected and actual arguments doesnt match. Can you check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch. Updated.
@@ -681,6 +698,7 @@ public BatchUniqueIdentifier batchUniqueIdentifier() { | |||
private boolean isDisposed = false; | |||
private boolean isErrorDispatched = false; | |||
private boolean isCancelled = false; | |||
private RequestDetails recordFlowLastSuccessfuRequestDetails; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this needs to be a class variable? Let's keep the scope of this object to be within the responceReceived() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this doesn't need to be class level, will move it to be scoped within responseReceived()
.
@@ -718,8 +736,11 @@ public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { | |||
|
|||
@Override | |||
public void responseReceived(SubscribeToShardResponse response) { | |||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received", | |||
parent.shardId, connectionStartedAt, subscribeToShardId); | |||
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't it make sense to log the response that's just received here, instead of the older one? Moving this log statement after the response parsing would address this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree we don't need the stale response metadata, I'll just remove the old response info.
@@ -40,6 +42,7 @@ | |||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; | |||
|
|||
private Subscriber<? super RecordsRetrieved> subscriber; | |||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation will print none for blocking record publisher implementation. let's update the requestId as part of getNextResults() method to keep this path inline with expectation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll add this to BlockingRecordsPublisher
.
@@ -98,6 +100,7 @@ | |||
private boolean wasReset = false; | |||
|
|||
private Instant lastEventDeliveryTime = Instant.EPOCH; | |||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to blocking publisher, let's update the requestId in DefaultGetRecordsCacheDeamon, makeRetrievalAttempt() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced offline, this requires us to either add some unit test specific implementation/upgrade mockito major version since AwsResponseMetadata is a final class which we can't mock with older version of mockito.
Issue #, if available:
Description of changes:
Adding Request ID logging to aid in SubscribeToShard debugging.
Testing:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.