Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop receiving records with error "Last request was dispatched at...but no response as of ...Cancelling subscription, and restarting." (KCL 2.0) #448

Closed
horaceto opened this issue Oct 19, 2018 · 6 comments · Fixed by #462
Labels
bug v2.x Issues related to the 2.x version
Milestone

Comments

@horaceto
Copy link

horaceto commented Oct 19, 2018

I have a Kinesis stream of 2 shards with data published to it continuously. I use KCL 2.0.1 java to connect to the stream with polling (by populating retrievalConfig.retrievalSpecificConfig with a PollingConfig object). It works completely fine and keeps receiving messages from both shards for the first 10 minutes. After that, it stops receiving any message, even there is data continuously published to the stream. I leave the process running for 5 more minutes and issue persists. After that I restart the process, and it starts receiving messages again from both shards, but stops receiving messages again after running for 10 minutes. Issue happens repeatedly.

No throttling error is seen in logs. Instead, following errors are seen in logs:

2018-10-19 14:12:43,531 ERROR [main] shardId-000000000000: Last request was dispatched at 2018-10-19T03:12:07.772Z, but no response as of 2018-10-19T03:12:43.531Z (PT35.759S). Cancelling subscription, and restarting.

This kind of logs appear once for every 35 seconds for each shard. When it first appeared, it happened to shardId-000000000000, and no more messages are received from this shard. Then it appeared for shardId-000000000001 as well, and no more message is received from this shard.

To isolate the publishing factor, I've done another test where I first published lots of data to the stream without consuming. Then I stop publishing and start the consumer application. Same behaviours are observed.

Same behaviours are observed with KCL 2.0.3.

I've extracted and attached the relevant application logs and error logs for reference.

app.log
error.log

Any idea?

@da14purc
Copy link

da14purc commented Nov 1, 2018

I have seen this behavior as well when using PollingConfig.
After a bit of digging I found that the PrefetchRecordsPublisher used when polling is configured maintains a blocking queue of prefetched record inputs, and the size of the queue is limited. It looks like the pre-fetcher can get into a state where reads/writes to the queue get locked up.

I was able to consistently reproduce the issue with a processRecords implementation that did nothing but sleep for around 40 seconds. If the prefetch queue fills up, the ShardConsumer doesn't get sent any more record inputs (not even empty ones). The shard consumer continues to cancel subscriptions every 35 seconds because the lastRequestTime isn't being updated in response to new record inputs.

To workaround this I configured the pre-fetcher with a much larger max queue size and slightly higher idle time:

// Example in Groovy
def pollingConfig = new PollingConfig(streamName, kinesisClient)
pollingConfig.recordsFetcherFactory().maxPendingProcessRecordsInput(MAX_PENDING_PROCESS_RECORDS_INPUT)
pollingConfig.recordsFetcherFactory().idleMillisBetweenCalls(WAIT_TIME_BETWEEN_RECORD_POLLS.toMillis())

The workaround should make the issue much less likely if record processing is slow occasionally. It would probably only delay the issue if record processing is consistently slow.

@pfifer pfifer added bug v2.x Issues related to the 2.x version labels Nov 2, 2018
@pfifer
Copy link
Contributor

pfifer commented Nov 2, 2018

Sorry for the delayed response.

This is a bug that we are investigating the fix for.

pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Nov 7, 2018
Adding new items to the receive queue for the PrefetchRecordsPublisher
when at capacity would deadlock retrievals as it was already holding
a lock on this.

The method addArrivedRecordsInput did not need to be synchronized on
this as it didn't change any of the protected
state (requestedResponses).  There is a call to drainQueueForRequests
immediately after the addArrivedRecordsInput that will ensure newly
arrived data is dispatched.

This fixes awslabs#448
sahilpalvia pushed a commit that referenced this issue Nov 8, 2018
* Remove a possible deadlock on polling queue fill

Adding new items to the receive queue for the PrefetchRecordsPublisher
when at capacity would deadlock retrievals as it was already holding
a lock on this.

The method addArrivedRecordsInput did not need to be synchronized on
this as it didn't change any of the protected
state (requestedResponses).  There is a call to drainQueueForRequests
immediately after the addArrivedRecordsInput that will ensure newly
arrived data is dispatched.

This fixes #448

* Small fix on the reasoning comment

* Adjust the test to act more like the ShardConsumer

The ShardConsuemr, which is the principal user of the
PrefetchRecordsPublisher, uses RxJava to consume from publisher. This
test uses RxJava to consume, and notifies the test thread once
MAX_ITEMS * 3 have been received. This ensures that we cycle through
the queue at least 3 times.

* Removed the upper limit on the retrievals

The way RxJava's request management makes it possible that more
requests than we might expect can happen.
@pfifer pfifer added this to the v2.0.5 milestone Nov 8, 2018
@knap1930
Copy link

Any idea when 2.0.5 will be released? Running in to this issue with 2.0.4.

@pfifer
Copy link
Contributor

pfifer commented Nov 12, 2018

It will be released soon

@kiransahoo1980
Copy link

We are using 2.0.5 and still seeing these error messages in the logs

@ashwing
Copy link
Contributor

ashwing commented Aug 22, 2019

What to expect from KCL release 2.2.2?

Case 1: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you replaced or 
modified thread/queue configurations of SchedulerThreadPoolExecutor and faced situations 
like data loss or shard getting struck or intermittent pause in data consumpton, please 
continue reading, else go to next case. These issues are potentially due to undelivered 
executor service events.
 	a. 	This release ensures data loss does not happen, by preventing subsequent events 
 		from getting delivered upon an event delivery failure and later restarting the subscription. 
 		This may, however, lead to intermittent pause in data consumption, as KCL checks for 
 		exceptionally completed subscriptions every 35 secponds. Refer section d for mitigation. 
 		This scenario is identified by the log messages,
 			- ERROR s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000035: Received unexpected ack for the active subscription shardId-000000000077-005. Throwing.
 			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
 	b. 	This release prevents the shard stuck situations that happened due to undelivered 
 		control plane messages. [NOTE : Control plane messages are responsible for fetching 
 		further events from KDS service. This includes subscribeToShard() API call and reactive 
 		layer's request(N) calls]
 	c. 	This release, have improved logging that will capture undelivered control plane 
 		messages for troubleshooting. Any undelivered control plane message might still lead 
 		to temporary pause in data consumption. Refer section d for mitigation. This is identified 
 		by the log message,
 			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
 	d. 	Mitigation for undelivered messages: The undelivered messages are primarily due to 
 		reduced SchedulerThreadPoolExecutor capacity. The customers are expected to assess 
 		the state of the SchedulerThreadPoolExecutor using the following diagnostic log messages 
 		and take appropriate actions like reducing the RecordProcessor.processRecords' Time or 
 		scaling out the application or increase the number of threads in the executor.
 			i. 	ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads and 
 				queueSize are consistently high.
 					- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=100, coreThreads=0, leasesOwned=40, largestPoolSize=2, maximumPoolSize=2147483647)
 			ii.	RejectedTaskEvent ERROR log emitted when SchedulerThreadPoolExecutor fails to 
 				execute any event
 					- ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Task rejections will slow down your application and some shards may stop processing. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) - io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.

Case 2: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you did NOT modify 
or replace SchedulerThreadPoolExecutor and faced shard stuck situations or frequent 
intermittent pause in data consumption, please continue reading, else go to next case. 
These issues are potentially due to submitting more tasks to SchedulerThreadPoolExecutor, 
than it can handle, leading to delayed execution of submitted tasks.
	a. 	The common symptom of this situation is the following log message,
			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000077: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
	b. 	This release has more diagnostic log messages to identify the issues around the 
		congested SchedulerThreadPoolExecutor
			i. 	FanOutRecordsPublisher WARN log indicating high time (over 11 seconds) taken 
				by SchedulerThreadPoolExecutor to deliver an event to ShardConsumer. 
				Ideally delivery time should be less than a second. If this is consistently 
				high, refer section c.
					- WARN  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is high at 14534 millis. Check the ExecutorStateEvent logs to see the state of the executor service. Also check if the RecordProcessor's processing time is high.
			ii. FanOutRecordsPublisher DEBUG log to check the current event delivery time 
				by SchedulerThreadPoolExecutor. Ideally this should be less than a second. 
				If this is consistently high, refer section c.
					- DEBUG  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is at 401 millis
			iii.ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads 
				is consistently high. activeThreads is considered high if it is more than 2X 
				the number of worker leases. If this is consistently high, refer section c.
 					- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=450, coreThreads=0, leasesOwned=64, largestPoolSize=520, maximumPoolSize=2147483647)
	c. 	The customers are expected to assess the state of the SchedulerThreadPoolExecutor using 
		the above diagnostic log messages and take appropriate mitigations like reducing the 
		RecordProcessor.processRecords' Time or scaling out the application. 
		

Case 3. All customers of KCL 2.x, prior to 2.2.2 release, were in a blind spot to throttling or 
any other exception from Cloudwatch metrics publish calls. Now these exceptions are made visible 
and we expect customers to take appropriate actions like increasing the Cloudwatch Put API TPS 
to fix the throttling issue or increasing the concurrent connections of the cloudwatch client 
to fix the limited connections issue.
	a. 	Increasing the concurrency of client - CloudWatchAsyncClient.builder().region(region).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)).build();
	b. 	Cloudwatch limit increase - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug v2.x Issues related to the 2.x version
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants