-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to prevent data loss and stuck shards in the event of failed records delivery in Polling readers #603
Conversation
return result == null ? result : result.prepareForPublish(); | ||
} | ||
|
||
RecordsRetrieved evictNextResult() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pollNextResult? just to be in sync with Queue API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm no strong preference. Will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to pollNextResultAndUpdatePrefetchCounters
private Instant lastEventDeliveryTime = Instant.EPOCH; | ||
// This flag controls who should drain the next request in the prefetch queue. | ||
// When set to false, the publisher and demand-notifier thread would have the control. | ||
// When set to true, the event-notifier thread would have the control. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better if we can define what each thread is doing. Maybe we can add a class comment for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
// This flag controls who should drain the next request in the prefetch queue. | ||
// When set to false, the publisher and demand-notifier thread would have the control. | ||
// When set to true, the event-notifier thread would have the control. | ||
private AtomicBoolean shouldDrainEventOnAck = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldDrainEventOnlyOnAck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the reason for shards getting stuck in the previous semaphore solution was that we were not releasing the semaphore when the health checker calls restartFrom. If we do that, shouldn't it solve the entire problem of shards getting stuck forever?
This solution fixes the above problem by handing the control over to publisher thread when the restartFrom is called. But what do we gain by letting the ack-notifier thread drain the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The blocking publisher thread on the semaphore would have acquired the readlock. In order to release the semaphore, the reset thread should acquire readlock as well. Assume the reset thread acquires the readlock and releases the semaphore, we should avoid the publisher thread from publisher thread from further publishing, until the reset thread acquires the write lock.
- Also if we have a stale notification after clearing the queue in reset say after 60 seconds, then this would go ahead and release a semaphore, which means the publisher can publish two events without receiving ack for first event, which would cause durability issues.
Considering these factors among others, we decided it would be better if we can validate the ack before publishing the next event, it would prevent the durability issue. Also to ensure that an event is scheduled as soon as possible when there are prefetched events and to avoid having any blocking calls, we introduced this concept of scheduling from different threads and exchanging controls based on the queue state and demand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha.. yes. Makes sense. Thanks for the details.
But I still think we can achieve this without letting the ack thread drain the queue. We can discuss this later. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The publisher thread has this logic of spinning in a while loop to offer the prefetched element to the queue. That's the reason why the ack-thread (now) and demand-thread (earlier) are calling the drainQueue directly. Let's discuss this offline.
* Method that will be called by the 'publisher thread' and the 'demand notifying thread', | ||
* to drain the events if the 'event notifying thread' do not have the control. | ||
*/ | ||
private synchronized void initiateDrainQueueForRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initiate may not be the best word here. maybe drainQueueForRequestsIfAllowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
if (requestedResponses.get() > 0 && recordsToDeliver != null) { | ||
lastEventDeliveryTime = Instant.now(); | ||
subscriber.onNext(recordsToDeliver); | ||
if(!shouldDrainEventOnAck.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing space between if and (
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will address
} else { | ||
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier | ||
// thread. | ||
if(shouldDrainEventOnAck.get()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing space between if and (. check all such places.
// Give the drain control to publisher/demand-notifier thread. | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnAck.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, we had the eventDeliveryLock semaphore which was only released upon receiving the ack. So if the ack is lost somewhere, the shard gets stuck for ever. Was there any reason why we didn't release it within restartFrom here?
Looks like we are fixing that issue in this change. Shouldn't adding this step into the Semaphore solution fix the issue? What do we gain on top of that by this new solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
answered above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sorry about the duplicate. :)
shouldDrainEventOnAck.set(false); | ||
} else { | ||
// Else attempt to drain the queue. | ||
drainQueueForRequests(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the happy case where we continue to receive acks for each event delivered, always the ack-notifier thread will be draining the queue. Publisher thread will continue to do only the fetch part. I think this conflicts a bit with the design of this class and the purpose of the publisher thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publisher thread will be a kick starter in the event of paused deliveries. In happy case, yes ack-notifier will be scheduling for minimum delay between each delivery and for simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it'll help a bit with the delay. But not with the simplicity. :) We have 3 different threads draining the queue. I think it would be better and simpler if we had only one thread publishing and other two just updating the demand and registering the ack. Anyway that's a bigger change and we can consider that in the future. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else { | ||
// Log and ignore any other ack received. As long as an ack is received for head of the queue | ||
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible | ||
// to happen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason to come here is when the queue is reset before the ack comes back. Add this too in the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be considered as a stale ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Fix to prevent data loss and stuck shards in the event of failed records delivery in
Polling
readersIssue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.