Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application didn't checkpoint at end of shard #211

Open
posac opened this issue Sep 15, 2017 · 3 comments
Open

Application didn't checkpoint at end of shard #211

posac opened this issue Sep 15, 2017 · 3 comments

Comments

@posac
Copy link

posac commented Sep 15, 2017

Hi,

I have issue with checkpoints and ending shards.
In my processor, after each processRecords call I am calling checkpoint with sequence number. Sequence number is also stored in processor field.

On shutdown I am also calling :
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
checkpoint(shutdownInput.getCheckpointer());
}

But after "Reached end of shard " I got exception:
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-00000001505383995695-aa7bbe9c at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:107) ~[amazon-kinesis-client-1.8.1.jar:?]

I have debug your code and for me there is bug in : RecordProcessorCheckpointer.java in method advancePosition.

I believe that in condition :
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {

should be :
if (checkpointToRecord != null && !checkpointToRecord.equals(lastCheckpointValue)) {

Otherwise I am not able to store SHARD_END - cause when processRecord calls checkpoint in checkpoint there is no sequenceNumberAtShardEnd yet, It is when it is called from shutdown but then mentioned condition is not satisfy -> lastCheckpointValue is equal to extendedSequenceNumber

@lskrajny
Copy link

+1

1 similar comment
@versemonger
Copy link

+1

@klesniewski
Copy link

klesniewski commented Jul 21, 2018

I have also encountered this problem. It is annoying, as when it happens the worker cannot progress causing service downtime (it doesn't crash, but it stops working as expected). It may happen when instance of IRecordProcessor checkpoints specific records (sequence numbers) instead of using IRecordProcessorCheckpointer.checkpoint().

Here is how it is exhibited in logs:

2018-07-20 22:00:20,469 DEBUG      StreamsRecordProcessor> Processing 1 records
2018-07-20 22:00:20,480  INFO KafkaStreamsRecordProcessor> KafkaSendCompleted to 'devices-ingress' at 14:a4
2018-07-20 22:00:20,486  INFO     SequenceNumberValidator> Validated sequence number 1434700000000005227153817 with shard id shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:25,464  INFO                      Worker> Current stream shard assignments: shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:25,464  INFO                      Worker> Sleeping ...
2018-07-20 22:00:35,466  INFO                 ProcessTask> Reached end of shard shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:36,466  INFO      StreamsRecordProcessor> Shutting down with status TERMINATE
2018-07-20 22:00:36,471  INFO     SequenceNumberValidator> Validated sequence number 1434700000000005227153817 with shard id shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:36,471 ERROR                ShutdownTask> Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-00000001532110040782-92c71e5b
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:113) [service.jar:?]
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [service.jar:?]
        at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [service.jar:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.lang.Thread.run(Thread.java:844) [?:?]

Here is what happened:

  1. In 20th second new record was picked up from the stream and processed. The record was checkpointed. The shard is still active and so RecordProcessorCheckpointer.sequenceNumberAtShardEnd is null.
  2. The next 14 runs (polling every second) don't return any new records. The shard is still active.
  3. On 15th run the library finds out the shard was closed and that we are at shard end. It calls IRecordProcessor.shutdown(ShutdownInput) with ShutdownReason.TERMINATE. I flush processing and commit the last record I know about. This is the record that was processed in 20th second. This is the last record in the shard. The RecordProcessorCheckpointer.advancePosition(ExtendedSequenceNumber) sets checkpointToRecord to SHARD_END, but mentioned by @posac condition is false (this sequence number was already checkpointed) and the SHARD_END is not checkpointed. The KCL throws IllegalArgumentException and retries in next second. It gets repeated every second and application doesn't progress.

It can be worked around by using IRecordProcessorCheckpointer.checkpoint() in IRecordProcessor.shutdown() , which will checkpoint RecordProcessorCheckpointer.largestPermittedCheckpointValue (it is set to SHARD_END once we are at shard end). If you are processing your records asynchronously, make sure that all records were processed before you commit using any of above workarounds.

Correction suggested by @posac will solve the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants