Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain delivery queue to make slow consumers consume events at their pace #607

Merged
merged 4 commits into from
Sep 14, 2019

Conversation

ashwing
Copy link
Contributor

@ashwing ashwing commented Sep 9, 2019

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good to me, leaving minor feedback.

@@ -158,20 +151,26 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
// RecordFlow of the current event that needs to be returned
RecordFlow flowToBeReturned = null;

final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ?
recordsRetrievedContext.getRecordsOrShutDownEvent()
.map(recordsEvent -> recordsEvent, shutDownEvent -> null) : null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this logic out into RecordsRetrievedContext. getRecordsRetrieved either returns null or records depending on the event type stored in the context.

if (!recordsDeliveryQueue.isEmpty()) {
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved());
recordsDeliveryQueue.peek().getRecordsOrShutDownEvent()
.apply(recordsEvent -> scheduleNextEvent(recordsEvent),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also move out into the RecordsRetrievedContext? each event would define execution action, similar to the shutdownEvent's getSubscriptionShutdownAction, and this flow would just call executeEvenAction independent of the event type.

private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;
}

@Getter
private static final class SubscriptionShutDownEvent {
private final Runnable subscriptionShutDownAction;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: s/ShutDown/Shutdown

@micah-jaffe micah-jaffe merged commit a94dc7d into awslabs:master Sep 14, 2019
@ashwing ashwing mentioned this pull request Sep 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants