Skip to content

Commit

Permalink
Depends on general type on KafkaMessageListenerContainer.
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Oct 11, 2024
1 parent 0c89c92 commit 3f6b447
Showing 1 changed file with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -903,19 +903,12 @@ else if (listener instanceof MessageListener) {
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V>) {

RecordMessagingMessageListenerAdapter<K, V> recordAdapterListener =
(RecordMessagingMessageListenerAdapter<K, V>) genListener.getDelegate();

BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailure =
(cRecord, ex) -> this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
recordAdapterListener.setCallbackForAsyncFailure(callbackForAsyncFailure);
this.genericListener instanceof AbstractDelegatingMessageListenerAdapter<?>) {
AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> genListener =
(AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
adapterListener.setCallbackForAsyncFailure(getCallbackForAsyncFailure());
}

}
}
else {
Expand Down Expand Up @@ -3391,6 +3384,10 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
.values();
}

private BiConsumer<ConsumerRecord<K, V>, RuntimeException> getCallbackForAsyncFailure() {
return (cRecord, ex) -> this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
}

@Override
public void seek(String topic, int partition, long offset) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
Expand Down

0 comments on commit 3f6b447

Please sign in to comment.