Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited threads resiliency fix durability nonblock #573

Conversation

ashwing
Copy link
Contributor

@ashwing ashwing commented Jul 18, 2019

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

*
* @return sequenceNumber to checkpoint
*/
default String batchSequenceNumber() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's reuse the same term from the API (i.e. continuationSequenceNumber), unless you have specific reason to introduce a new name here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont use the term continuationSequenceNumber for polling API. For S2S it is the contSeqNum explicitly set and for Get it is the last successfully processed SeqNum in that record batch. Hence I wanted this to be a seqNum that uniquely identifies the record batch that was processed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see, these Ifaces are not integrated with Get path. The use of this attribute is continuationSequenceNumber, or checkpointSequenceNumber.

the new batchSeqNum naming seems to be just another way of saying continuationSequenceNumber, and we should reuse same terms, instead of introducing new ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was referring to the following RecordsRetrieved Impl.

@Accessors(fluent = true)
    @Data
    static class PrefetchRecordsRetrieved implements RecordsRetrieved {

        final ProcessRecordsInput processRecordsInput;
        final String lastBatchSequenceNumber;
        final String shardIterator;

        PrefetchRecordsRetrieved prepareForPublish() {
            return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
                    lastBatchSequenceNumber, shardIterator);
        }

    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into it again, it makes sense to keep it as continuationSequenceNumber as RecordsRetrieved might still have this as a property for pagination.

* Sequence Number of the record batch that was delivered to the Subscriber/Observer.
* @return deliveredSequenceNumber
*/
String deliveredSequenceNumber();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems to be the checkpoint sequence number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the Acker's perspective this is just the seqnum that was successfully delivered. Using it for checkpointing is upto the caller.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That comment makes me think that all we need is to send the UUID back with the Ack. The batch details are already in the queue, the purpose of the Ack is to match with the UUID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya removing the seqnum from ack

} catch (Throwable t) {
errorOccurred(triggeringFlowFuture.getNow(null), t);
}
// TODO : debug log on isNextEventScheduled

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take care of TODOs. Except logging purposes, isNextEventScheduled variable doesn't seem to be used. Can the logging taken care of within evictAckedEventAndScheduleNextEvent implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNextEventScheduled was earlier used, but I dont see a purpose of returning. Will remove it.

// TODO : toString implementation for recordsRetrievedAck
log.error("{}: KCL BUG: Found mismatched payload {} in the delivery queue for the ack {} ", shardId,
recordsRetrievedContext.getRecordsRetrieved(), recordsRetrievedAck);
throw new IllegalStateException("KCL BUG: Record delivery ack mismatch");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens upon this exception? Does the queue get reset, and new subscription started?

We should make sure application is not stuck, and retries to make progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will trigger errorOccured, everything will be reset and health check would take care of initializing the new subscription.

}

@Override
public void notify(RecordsRetrievedAck recordsRetrievedAck) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: This is an ack for record delivery to the ShardConsumerSubscriber. Should we rename? i.e. RecordDeliveryAck

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}
// TODO : debug log on isNextEventScheduled
final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null);
if (triggeringFlow != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the case this will not be completed?

Looking at the evictAckedEventAndScheduleNextEvent implementation, completable either gets completed, or throws.

Copy link
Contributor Author

@ashwing ashwing Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I just used completable future as a way to get the triggeringflow info from the evictAckedEventAndScheduleNextEvent. : ) We can return the triggeringflow info as part of the api return type as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's keep this simple, and have evictAckedEventAndScheduleNextEvent return the RecordFlow. There is nothing async going on here that needs Future.

Also, should we keep the method name simpler -- i.e. handleNotify or handleNotification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay will have the record flow return, but I feel the method name can still be descriptive enough to convey its role in the ack mechanism.

shardId, recordsDeliveryQueue.remainingCapacity());
throw e;
} catch (Throwable t) {
recordsDeliveryQueue.remove(recordsRetrievedContext);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only removal of this event, is throwable accepted here? I expect that the flow gets restarted from the beginning upon exception. we should also log error on this throw block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the context of this method, it failed to schedule the event that was already queued. Hence we remove it from queue. But when we throw back the throwable, it will be caught by the errorOccured and the queue will be cleared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can decide on whether we need to log an error.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging error here will be good. Probably no need to bother removal, exception should reset the flow.

triggeringFlowFuture.complete(recordsRetrievedContext.getRecordFlow());
// Try scheduling the next event in the queue, if available.
if (recordsDeliveryQueue.peek() != null) {
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to introduce a race at record delivery. This method can find find the event in the queue as the next event, and deliver to Subscriber. But the event that is found could be the only event (added right after removal of the one upon the notification handled here), and another onNext would already be scheduled by the bufferCurrentEventAndScheduleIfRequired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm bufferCurrentEventAndScheduleIfRequired would block wait on the lockObject until this is completed. After scheduling delivery by evictAckedEventAndScheduleNextEvent, the bufferCurrentEventAndScheduleIfRequired would acquire the lock, enqueue the next event, will find there are 2 events and will leave it to evictAckedEventAndScheduleNextEvent schedule the next event upon ack for previous event.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's right, all existing paths seem to get a hold on the lock. However, I'm not sure if this diff is showing all the new code changes introduced in this PR. I left a separate comment on that. Let's sync up offline.

}

@Data
private static class RecordsRetrievedContext {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Ashwin, looking good to me at the high level. Leaving minor comments.

* Return the publisher to be notified
* @return RecordsPublisher to be notified.
*/
RecordsPublisher getWaitingRecordsPublisher();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can "waiting" omitted here? getRecordsPublisher() sounds sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. sounds good

* @param recordsRetrieved for which we need the ack.
* @return RecordsRetrievedAck
*/
RecordsDeliveryAck getRecordsRetrievedAck(RecordsRetrieved recordsRetrieved);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep inline with the return type: getRecordsDeliverAck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. will change it

}
// TODO : debug log on isNextEventScheduled
final RecordFlow triggeringFlow = triggeringFlowFuture.getNow(null);
if (triggeringFlow != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's keep this simple, and have evictAckedEventAndScheduleNextEvent return the RecordFlow. There is nothing async going on here that needs Future.

Also, should we keep the method name simpler -- i.e. handleNotify or handleNotification

if (flow != null && recordsDeliveryAck.batchUniqueIdentifier().getFlowIdentifier()
.equals(flow.getSubscribeToShardId())) {
log.error(
"{}: KCL BUG: Publisher found mismatched ack for subscription {} ",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging KCL BUG is not necessarily helpful to customers. Instead, let's give more details on what happens under this condition. i.e. "Unexpected event received. Restarting subscription."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yha we can add more context. Chose to have "KCL Bug" so that the customers can reachout to us in this case. anyways we can convey this using "Unexpected exception" as well

}
// Otherwise publisher received a stale ack.
else {
log.info("{}: Publisher received duplicate ack or an ack for stale subscription {}. Ignoring.", shardId,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be an ack to for the next record in the queue? that would mean missing ack, and we should not ignore.

null flow would be the only case where ignore is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be getting ack for an event in queue, until the event before it in the queue received one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, if things work the way we expect them to work. We should plan for unexpected, and have safeguards in place.

shardId, recordsDeliveryQueue.remainingCapacity());
throw e;
} catch (Throwable t) {
recordsDeliveryQueue.remove(recordsRetrievedContext);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging error here will be good. Probably no need to bother removal, exception should reset the flow.

}

@VisibleForTesting
void setSubscriberForTesting(Subscriber<RecordsRetrieved> s) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not make the class mutable for tests. same applies to setFlow.

why not call subscribe to set the subscriber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It required to mock some other dependencies. Hence resorted to a simple workaround. Will check if this can be avoided.

@ashwing ashwing marked this pull request as ready for review August 13, 2019 17:45
Copy link
Contributor

@micah-jaffe micah-jaffe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, left a few minor comments. Two general comments:

  1. Can we add a @KinesisClientInternalApi annotation to all the new classes in this PR? That gives us more freedom to modify them without impacting customers (without warning)

  2. As this change adds resiliency to rejected tasks, can we update the messaging in https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java#L29

if (durationBetweenEnqueueAndAckInMillis > MAX_TIME_BETWEEN_REQUEST_RESPONSE / 3) {
// The above condition logs the warn msg if the delivery time exceeds 11 seconds.
log.warn(
"{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Can we specify what to look for in ExecutorStateEvent logs and what actions to take? Something maybe like "Check the ExecutorStateEvent logs to see if many threads are running concurrently. Consider using the default configuration."

Also can we specify where to check for RecordProcessor running time and also what to do if it's too high?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm there are different executor states which might lead to this situation. This can happen even with deault executor. I feel it is better we leave it to the customer to evaluate from the state information available.


package software.amazon.kinesis.retrieval;

public interface RecordsDeliveryAck {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a @FunctionalInterface annotation here to make the usage in ShardConsumerNotifyingSubscriber a bit clearer?

Also a javadoc comment on this class would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might add more state information in future to this interface, which might need more than one abstract method. Added javadoc comment.

* @param ack acknowledgement received from the subscriber.
*/
default void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why is the desired default behavior to throw an exception here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why is the desired default behavior to throw an exception here?
I think this is because the logic for evictAckedEventAndScheduleNextEvent is implemented in FanOutRecordsPublisher, which extends this class. FanOutRecordsPublisher is the publisher that we are using and requires the Ack mechanism.

Copy link
Contributor Author

@ashwing ashwing Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to inform the NotifyingSubscriber that the Publisher it is subscribing to, has not implemented notify() method. Rather allowing it to be a no-op, we throw exception back.

@@ -16,6 +16,8 @@

import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;

import java.util.UUID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like you use this import in this file, maybe a remnant of earlier commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Removed.

shardId, recordsDeliveryQueue.remainingCapacity());
throw e;
} catch (Throwable t) {
log.error("{}: Unable to deliver event to the shard consumer.", shardId, t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to add some info from the RecordsRetrievedContext to this log and the one above? Seems like when debugging we might want to know more than just the shardId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any use case where record context might be helpful? I feel these two exceptions will be thrown due to capacity constraint, rather than due to the record itself. Let me know if you feel otherwise.

return;
}

// Clear the delivery buffer so that next subscription don't yield duplicate records.
clearRecordsDeliveryQueue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming that clearRecordsDeliveryQueue is a blocking operation, right? We won't start fetching new records until the queue is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. This would just empty the local queue.

@@ -489,11 +612,18 @@ public void onComplete() {

private final ProcessRecordsInput processRecordsInput;
private final String continuationSequenceNumber;
private final String flowIdentifier;
private final String batchUniqueIdentifier = UUID.randomUUID().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk of generating duplicate batchUniqueIdentifiers by using random instead of sequential generation? If this does happen will it break anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked this analogy from quora "If every person on the planet generates a new UUID4 every second, we’d expect a collision to occur after about 10 years." SRC : https://www.quora.com/Has-there-ever-been-a-UUID-collision

The only risk I can see is if two same UUIDs generated one after another. In this case a malicious subscriber sending more than one ack for the same recordbatch, will delete the next event in quque. But this is extremely unlikely to happen.

Copy link
Contributor

@ychunxue ychunxue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm. Just minor NIT and a question.

@@ -77,7 +77,7 @@ void startSubscriptions() {
recordsPublisher.restartFrom(lastAccepted);
}
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(this);
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cooment: Maybe add a comment here to explain that the ShardConsumerNotifyingSubscriber is the new subscriber that we introduced the Ack mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new subscriber, but just a wrapper on top of the existing one.

import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor NIT: Can we just import the class and use the static method in notify method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to avoid wildcard imports.

// Note: This does not block wait to enqueue.
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question about this code path: So recordsReceived seems to be another code path that we receive the records without the ack? Are we still allowing the subscriber to schedule another record in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK thread dispatches the records to our Publisher in a blocking manner. That is an event(T) will be dispatched only when the SDK thread sees the Future for event(T-1) is completed successfully.

* @param ack acknowledgement received from the subscriber.
*/
default void notify(RecordsDeliveryAck ack) {
throw new UnsupportedOperationException("RecordsPublisher does not support acknowledgement from Subscriber");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why is the desired default behavior to throw an exception here?
I think this is because the logic for evictAckedEventAndScheduleNextEvent is implemented in FanOutRecordsPublisher, which extends this class. FanOutRecordsPublisher is the publisher that we are using and requires the Ack mechanism.

Copy link
Contributor

@micah-jaffe micah-jaffe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved

@micah-jaffe micah-jaffe merged commit 3f6afc6 into awslabs:master Aug 16, 2019
@sahilpalvia sahilpalvia added this to the v2.2.2 milestone Aug 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants