Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding rid to SubscribeToShard response for debugging. #678

Merged
merged 10 commits into from
Feb 17, 2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package software.amazon.kinesis.common;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.util.Optional;

@Accessors(fluent=true)
@Getter
public class RequestDetails {

/**
* Placeholder for logging when no successful request has been made.
*/
private static final String NONE = "NONE";

private final Optional<String> requestId;
private final Optional<String> timestamp;

public RequestDetails() {
this(Optional.empty(), Optional.empty());
}

public RequestDetails(Optional<String> requestId, Optional<String> timestamp) {
this.requestId = requestId;
this.timestamp = timestamp;
}

/**
* Gets last successful request's request id.
*
* @return requestId associated with last successful request.
*/
public String getRequestId() {
return requestId.orElse(NONE);
}

/**
* Gets last successful request's timestamp.
*
* @return timestamp associated with last successful request.
*/
public String getTimestamp() {
return timestamp.orElse(NONE);
}

@Override
public String toString() {
return String.format("request id - %s, timestamp - %s", getRequestId(), getTimestamp());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ private void restartIfRequestTimerExpired(long maxTimeBetweenRequests) {
Duration timeSinceLastResponse = Duration.between(lastRequestTime, now);
if (timeSinceLastResponse.toMillis() > maxTimeBetweenRequests) {
log.error(
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting.",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse);
"{}: Last request was dispatched at {}, but no response as of {} ({}). Cancelling subscription, and restarting. Last successful request details -- {}",
shardConsumer.shardInfo().shardId(), lastRequestTime, now, timeSinceLastResponse, recordsPublisher.getLastSuccessfulRequestDetails());
cancel();

// Start the subscription again which will update the lastRequestTime as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@
import org.reactivestreams.Publisher;

import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.util.Optional;

/**
* Provides a record publisher that will retrieve records from Kinesis for processing
*/
public interface RecordsPublisher extends Publisher<RecordsRetrieved> {



/**
* Initializes the publisher with where to start processing. If there is a stored sequence number the publisher will
* begin from that sequence number, otherwise it will use the initial position.
*
*
* @param extendedSequenceNumber
* the sequence number to start processing from
* @param initialPositionInStreamExtended
Expand All @@ -40,15 +46,23 @@ public interface RecordsPublisher extends Publisher<RecordsRetrieved> {
* @param recordsRetrieved the processRecordsInput to restart from
*/
void restartFrom(RecordsRetrieved recordsRetrieved);


/**
* Shutdowns the publisher. Once this method returns the publisher should no longer provide any records.
*/
void shutdown();

/**
* Gets last successful request details.
*
* @return details associated with last successful request.
*/
RequestDetails getLastSuccessfulRequestDetails();

/**
* Notify the publisher on receipt of a data event.
*
* @param ack acknowledgement received from the subscriber.
*/
default void notify(RecordsDeliveryAck ack) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.IteratorBuilder;
Expand All @@ -50,6 +51,7 @@
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
MAX_EVENT_BURST_FROM_SERVICE);

private RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
Expand Down Expand Up @@ -143,6 +147,15 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) {
}
}

@Override
public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails;
}

private void setLastSuccessfulRequestDetails(RequestDetails requestDetails) {
lastSuccessfulRequestDetails = requestDetails;
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
@VisibleForTesting
RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliveryAck) {
Expand Down Expand Up @@ -204,8 +217,9 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved,
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
shardId, recordsDeliveryQueue.remainingCapacity());

log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {}. Last successful request details -- {}",
shardId, recordsDeliveryQueue.remainingCapacity(), lastSuccessfulRequestDetails);
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
Expand Down Expand Up @@ -288,12 +302,13 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
if (!hasValidSubscriber()) {
if(hasValidFlow()) {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- Subscriber is null." +
" Last successful request details -- {}", shardId, flow.connectionStartedAt,
flow.subscribeToShardId, lastSuccessfulRequestDetails);
} else {
log.warn(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null",
shardId);
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) -- Subscriber and flow are null." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
}
return;
}
Expand All @@ -304,8 +319,9 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
if (isActiveFlow(triggeringFlow)) {
if (flow != null) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString);
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s." +
" Last successful request details -- {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString, lastSuccessfulRequestDetails);
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
Expand All @@ -329,7 +345,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
try {
handleFlowError(propagationThrowable, triggeringFlow);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
log.warn("{}: Exception while calling subscriber.onError. Last successful request details -- {}", shardId, lastSuccessfulRequestDetails, innerThrowable);
}
subscriber = null;
flow = null;
Expand All @@ -351,7 +367,7 @@ private void resetRecordsDeliveryStateOnSubscriptionOnInit() {
// Clear any lingering records in the queue.
if (!recordsDeliveryQueue.isEmpty()) {
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
+ "previous subscription - {}. Last successful request details -- {}", shardId, subscribeToShardId, lastSuccessfulRequestDetails);
recordsDeliveryQueue.clear();
}
}
Expand Down Expand Up @@ -461,7 +477,8 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re
try {
bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, triggeringFlow);
} catch (Throwable t) {
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher.", shardId);
log.warn("{}: Unable to buffer or schedule onNext for subscriber. Failing publisher." +
" Last successful request details -- {}", shardId, lastSuccessfulRequestDetails);
errorOccurred(triggeringFlow, t);
}
}
Expand Down Expand Up @@ -557,8 +574,8 @@ public void request(long n) {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.",
shardId, n);
"{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match. Last successful request details -- {}",
shardId, n, lastSuccessfulRequestDetails);
return;
}
if (flow == null) {
Expand All @@ -584,14 +601,14 @@ public void cancel() {
synchronized (lockObject) {
if (subscriber != s) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.",
shardId);
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
return;
}
if (!hasValidSubscriber()) {
log.warn(
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber",
shardId);
"{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber. Last successful request details -- {}",
shardId, lastSuccessfulRequestDetails);
}
subscriber = null;
if (flow != null) {
Expand Down Expand Up @@ -681,6 +698,7 @@ static class RecordFlow implements SubscribeToShardResponseHandler {
private boolean isDisposed = false;
private boolean isErrorDispatched = false;
private boolean isCancelled = false;
private RequestDetails recordFlowLastSuccessfuRequestDetails;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this needs to be a class variable? Let's keep the scope of this object to be within the responceReceived() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this doesn't need to be class level, will move it to be scoped within responseReceived().


@Override
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
Expand Down Expand Up @@ -718,8 +736,11 @@ public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {

@Override
public void responseReceived(SubscribeToShardResponse response) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received",
parent.shardId, connectionStartedAt, subscribeToShardId);
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ {} id: {} -- Response received. RequestId - {} -- Last successful request details -- {} ",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it make sense to log the response that's just received here, instead of the older one? Moving this log statement after the response parsing would address this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree we don't need the stale response metadata, I'll just remove the old response info.

parent.shardId, connectionStartedAt, subscribeToShardId, response.responseMetadata().requestId(), parent.lastSuccessfulRequestDetails);

recordFlowLastSuccessfuRequestDetails = new RequestDetails(Optional.of(response.responseMetadata().requestId()), Optional.of(connectionStartedAt.toString()));
parent.setLastSuccessfulRequestDetails(this.recordFlowLastSuccessfuRequestDetails);
}

@Override
Expand Down Expand Up @@ -781,10 +802,9 @@ private void tryEnqueueSubscriptionShutdownEvent(SubscriptionShutdownEvent subsc
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional());
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. Last successful request details -- {}",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(),
parent.lastSuccessfulRequestDetails, subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}

Expand All @@ -800,13 +820,14 @@ private void executeComplete() {
// the
// subscription, which was cancelled for a reason (usually queue overflow).
//
log.warn("{}: complete called on a cancelled subscription. Ignoring completion", parent.shardId);
log.warn("{}: complete called on a cancelled subscription. Ignoring completion. Last successful request details -- {}",
parent.shardId, parent.lastSuccessfulRequestDetails);
return;
}
if (this.isDisposed) {
log.warn(
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion",
parent.shardId, connectionStartedAt, subscribeToShardId);
"{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- This flow has been disposed not dispatching completion. Last successful request details -- {}",
parent.shardId, connectionStartedAt, subscribeToShardId, parent.lastSuccessfulRequestDetails);
return;
}

Expand Down Expand Up @@ -942,7 +963,6 @@ public void onComplete() {
log.debug(
"{}: [SubscriptionLifetime]: (RecordSubscription#onComplete) @ {} id: {} -- Allowing RecordFlow to call onComplete",
parent.shardId, connectionStartedAt, subscribeToShardId);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package software.amazon.kinesis.retrieval.polling;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;

import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
Expand All @@ -40,6 +42,7 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;

private Subscriber<? super RecordsRetrieved> subscriber;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation will print none for blocking record publisher implementation. let's update the requestId as part of getNextResults() method to keep this path inline with expectation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll add this to BlockingRecordsPublisher.


public BlockingRecordsPublisher(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
Expand Down Expand Up @@ -70,6 +73,11 @@ public void shutdown() {
getRecordsRetrievalStrategy.shutdown();
}

@Override
public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails;
}

@Override
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber = s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -46,6 +47,7 @@
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
Expand Down Expand Up @@ -98,6 +100,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private boolean wasReset = false;

private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to blocking publisher, let's update the requestId in DefaultGetRecordsCacheDeamon, makeRetrievalAttempt() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline, this requires us to either add some unit test specific implementation/upgrade mockito major version since AwsResponseMetadata is a final class which we can't mock with older version of mockito.


@Data
@Accessors(fluent = true)
Expand Down Expand Up @@ -260,6 +263,11 @@ public void shutdown() {
started = false;
}

@Override
public RequestDetails getLastSuccessfulRequestDetails() {
return lastSuccessfulRequestDetails;
}

@Override
public void restartFrom(RecordsRetrieved recordsRetrieved) {
if (!(recordsRetrieved instanceof PrefetchRecordsRetrieved)) {
Expand Down
Loading