File tree Expand file tree Collapse file tree 1 file changed +9
-5
lines changed
spring-kafka/src/main/java/org/springframework/kafka/listener Expand file tree Collapse file tree 1 file changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -1664,11 +1664,15 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
16641664 @ SuppressWarnings ({ "unchecked" , RAWTYPES })
16651665 @ Override
16661666 protected void doInTransactionWithoutResult (TransactionStatus status ) {
1667- ((KafkaResourceHolder ) TransactionSynchronizationManager
1668- .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
1669- .getProducer ().sendOffsetsToTransaction (// NODSONAR never null
1670- Collections .singletonMap (partition , offsetAndMetadata ),
1671- ListenerConsumer .this .consumerGroupId );
1667+ KafkaResourceHolder holder =
1668+ (KafkaResourceHolder ) TransactionSynchronizationManager
1669+ .getResource (
1670+ ListenerConsumer .this .kafkaTxManager .getProducerFactory ());
1671+ if (holder != null ) {
1672+ holder .getProducer ().sendOffsetsToTransaction (
1673+ Collections .singletonMap (partition , offsetAndMetadata ),
1674+ ListenerConsumer .this .consumerGroupId );
1675+ }
16721676 }
16731677
16741678 });
You can’t perform that action at this time.
0 commit comments