Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClassCastException: io.confluent.parallelconsumer.state.WorkContainer cannot be cast to io.confluent.parallelconsumer.RecordContext #412

Closed
AhmetBasar opened this issue Sep 18, 2022 · 3 comments · Fixed by #417
Assignees
Labels
verified bug Something isn't working

Comments

@AhmetBasar
Copy link

Hello,

When i configure retryDelayProvier on ParallelConsumerOptions, it throws ClassCastException. Any help will be appreciated.
parallel-consumer-core version: 0.5.2.2

Thanks.

@astubbs
Copy link
Contributor

astubbs commented Sep 23, 2022

ah, interesting. can you show me your code sample?

@astubbs astubbs self-assigned this Sep 23, 2022
@AhmetBasar
Copy link
Author

AhmetBasar commented Sep 24, 2022

public static void main(String[] args) throws Exception {
	Properties props = new Properties();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
	props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id");
	Consumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

	ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
			.ordering(io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY)
			.maxConcurrency(5)
			.consumer(kafkaConsumer)
			.retryDelayProvider(recordContext -> {
				return Duration.ofMillis(10000);
			}).build();

	ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
	eosStreamProcessor.subscribe(pl.tlinkowski.unij.api.UniLists.of("TestTopic"));
	eosStreamProcessor.poll(context -> {
		if (true) {
			throw new RuntimeException();
		}
	});
}
java.lang.ClassCastException: io.confluent.parallelconsumer.state.WorkContainer cannot be cast to io.confluent.parallelconsumer.RecordContext
	at io.confluent.parallelconsumer.state.WorkContainer.getRetryDelayConfig(WorkContainer.java:146) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkContainer.tryAgainAt(WorkContainer.java:133) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkContainer.getDelayUntilRetryDue(WorkContainer.java:126) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkContainer.hasDelayPassed(WorkContainer.java:116) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkContainer.isAvailableToTakeAsWork(WorkContainer.java:229) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) ~[?:1.8.0_221]
	at java.util.concurrent.ConcurrentSkipListMap$ValueSpliterator.forEachRemaining(ConcurrentSkipListMap.java:3447) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_221]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_221]
	at java.util.stream.LongPipeline.reduce(LongPipeline.java:439) ~[?:1.8.0_221]
	at java.util.stream.LongPipeline.sum(LongPipeline.java:397) ~[?:1.8.0_221]
	at java.util.stream.ReferencePipeline.count(ReferencePipeline.java:526) ~[?:1.8.0_221]
	at io.confluent.parallelconsumer.state.ProcessingShard.getCountOfWorkAwaitingSelection(ProcessingShard.java:80) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at java.util.stream.ReferencePipeline$5$1.accept(ReferencePipeline.java:227) ~[?:1.8.0_221]
	at java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3566) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_221]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_221]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_221]
	at java.util.stream.LongPipeline.reduce(LongPipeline.java:439) ~[?:1.8.0_221]
	at java.util.stream.LongPipeline.sum(LongPipeline.java:397) ~[?:1.8.0_221]
	at io.confluent.parallelconsumer.state.ShardManager.getNumberOfWorkQueuedInShardsAwaitingSelection(ShardManager.java:93) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkManager.getNumberOfWorkQueuedInShardsAwaitingSelection(WorkManager.java:252) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkManager.isSufficientlyLoaded(WorkManager.java:232) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.state.WorkManager.shouldThrottle(WorkManager.java:224) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.shouldThrottle(BrokerPollSystem.java:296) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.managePauseOfSubscription(BrokerPollSystem.java:271) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:175) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116) ~[parallel-consumer-core-0.5.2.2.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]

@astubbs astubbs added the verified bug Something isn't working label Sep 27, 2022
@astubbs
Copy link
Contributor

astubbs commented Sep 27, 2022

Got it! Thanks, will get this out ASAP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
verified bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants