Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultAfterRollbackProcessor.isProcessInTransaction() flag should be considered in process() method #2878

Open
l0co opened this issue Nov 3, 2023 · 15 comments

Comments

@l0co
Copy link

l0co commented Nov 3, 2023

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

2.8.11 (but see in the currrent master as well)

Describe the bug

In DefaultAfterRollbackProcessor there's isProcessInTransaction() flag described as:

	/**
	 * Return true to invoke
	 * {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
	 * in a new transaction. Because the container cannot infer the desired behavior, the
	 * processor is responsible for sending the offset to the transaction if it decides to
	 * skip the failing record.
	 * @return true to run in a transaction; default false.
	 * @since 2.2.5
	 * @see #process(List, Consumer, MessageListenerContainer, Exception, boolean,
	 * ContainerProperties.EOSMode)
	 */

But this is not considered in process() method which is a part of the public interface of this class and one can expect to work. So, if you call this method yourself, in case you use isCommitRecovered() flag plus use transactional kafka template, you will end up with "no transaction" exception. This flag is only considered by default from org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#recordAfterRollback.

To Reproduce

With transactional KafkaTemplate set isCommitRecovered() to true and try to execute process() method yourself. This end with a "no transaction" exception.

Expected behavior

The above invocation should be wrapped properly into transaction.

@artembilan
Copy link
Member

Any chances that you can contribute that fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc?
Thanks

@l0co
Copy link
Author

l0co commented Nov 3, 2023

Might try to look at this, but for that I'd have to know one thing. This is called non-transactionally from KafkaMessageListenerContainer.ListenerConsumer.batchAfterRollback() and transactionally from KafkaMessageListenerContainer.ListenerConsumer.recordAfterRollback(). Is there any reason for this non-transactional call and should it be preserved? I mean, in case batch processing is used in non-transactional processing, KafkaTemplate would also be non-transactional and in the related code of DefaultAfterRollbackProcessor there will be no transaction, and finally the related code would not be executed (as in the current behavior).

In other words, can we call process() from KafkaMessageListenerContainer.ListenerConsumer.batchAfterRollback() with the same transaction conditions as KafkaMessageListenerContainer.ListenerConsumer.recordAfterRollback()?

That's probably a question to @garyrussell

@garyrussell
Copy link
Contributor

Is there any reason for this non-transactional call

Currently, with a record listener, the failed record can be recovered (e.g. sent to a DLT), in which case, its offset is committed by the DARP by sending the offset to the (new) transaction (we can't mix transactional and non-transactional offset commits).

Recovery is not (currently) possible with a batch listener so there is no need to start a new transaction in that case.

@sobychacko sobychacko self-assigned this Dec 4, 2023
@sobychacko
Copy link
Contributor

@l0co, I can look at this issue (unless you are still considering sending a fix). Could you paste here the minimum required to reproduce this error from an application? Did you create your own AfterRollbackProcessor in the application and then call process on it? Please provide some code snippets; that would speed things up. Thanks!

@l0co
Copy link
Author

l0co commented Dec 5, 2023

@sobychacko Hello. I planned to submit something here but the end of the year is a bit hot in my company and I haven't found time yet, so feel free to provide a fix.

Yes, I have my own AfterRollbackProcessor with the following code:

Runnable process = () -> strategy.processor().process(consumerRecords, consumer, container, exception, recoverable, eosMode);
if (strategy.processor().isProcessInTransaction())
	kafkaEventTemplate.doTransactionally(process);
else
	process.run();

So, the processing is delegated to a strategy, but I have to manually wrap it in kafka transactional code. While it should work out-of-the-box. I think the fix can be adding own transactional wrapping in org.springframework.kafka.listener.DefaultAfterRollbackProcessor.process():

	public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
			@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

		if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
				getRecoveryStrategy((List) records, exception), container, this.logger)
					&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
			// do manual transactional wrapping here --->
				ConsumerRecord<K, V> skipped = records.get(0);
				if (EOSMode.V1.equals(eosMode.getMode())) {
					this.kafkaTemplate.sendOffsetsToTransaction(
							Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
									createOffsetAndMetadata(container, skipped.offset() + 1)));
				}
				else {
					this.kafkaTemplate.sendOffsetsToTransaction(
							Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
									createOffsetAndMetadata(container, skipped.offset() + 1)), consumer.groupMetadata());
				}
		// end transactional wrapping here
		}

		if (!recoverable && this.backOff != null) {
			try {
				ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

	}

And I think we are protected from non-transactional (batch) processing by this.kafkaTemplate.isTransactional() condition.

@l0co
Copy link
Author

l0co commented Dec 5, 2023

To bo more specific: I use DefaultAfterRollbackProcessor but as a delegate through CompositeKafkaAfterRollbackProcessor which implements composite pattern, to allow to define different rollback strategies by topic.

@sobychacko
Copy link
Contributor

@l0co Thanks for the explanation! I see the issue but cannot reproduce it with my setup. Can you provide a simple standalone app where I can quickly reproduce it? I would like to see it failing on our end before adding the fix. Thanks!

@l0co
Copy link
Author

l0co commented Dec 8, 2023

@sobychacko not in this year probably :(

@sobychacko
Copy link
Contributor

@l0co Any updates on the sample application? Thanks.

@l0co
Copy link
Author

l0co commented Jan 22, 2024

I will try to prepare something this week

@l0co
Copy link
Author

l0co commented Jan 30, 2024

Hello, that was difficult to return to this subject after longer pause, but tried to check what's going on and prepared small tester https://github.com/l0co/spring-kafka-2878-testcase. This is not a bug, though, just some issue with architecture I had with the related code. If you run DemoApplicationTests.shouldProcessEvent() it works, but if you stop with breakpoint here you will see that this is only transactional, because the calling code takes care of it here. So, if you just used DefaultAfterRolllbackProcessor.process() somewhere in your code with transactional Kafka, you'd have an exception, and you have to wrap all calls to your AfterRolllbackProcessor into the same code:

			if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
				this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

					@Override
					protected void doInTransactionWithoutResult(TransactionStatus status) {
						afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer,
								KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
								ListenerConsumer.this.eosMode);
					}

				});
			}

We have a scenario where we call it by hand, and we had to replace it with the code above. While I think this could be just a part of the rollback processor logic, something like this:

	@Override
	public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
			@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

		if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
				getFailureTracker()::recovered, container, this.logger)
					&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
				// NEW CODE HERE ------------------------------------------------------>
				this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

					@Override
					protected void doInTransactionWithoutResult(TransactionStatus status) {
						ConsumerRecord<K, V> skipped = records.get(0);
						this.kafkaTemplate.sendOffsetsToTransaction(
								Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
										createOffsetAndMetadata(container, skipped.offset() + 1)
								), consumer.groupMetadata());
					}

				});
		}

		if (!recoverable && this.backOff != null) {
			try {
				ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

	}

@Wzy19930507
Copy link
Contributor

Wzy19930507 commented Feb 28, 2024

If set the transactionTemplate to DefaultAfterRolllbackProcessor, maybe first that we need ensure DefaultAfterRolllbackProcessor's KafkaAwareTransactionManager same as upstream components.

Such as KafkaMessageContainer's use this.transactionTemplate call DefaultAfterRolllbackProcessor.

Please correct me if there is any problem with what I say.

@l0co
Copy link
Author

l0co commented Mar 5, 2024

Do you mean this.transactionTemplate? Yes, this component is not available here but I think we could use kafkaTemplate.executeInTransaction(OperationsCallback<K, V, T> callback) instead.

@Wzy19930507
Copy link
Contributor

Do you mean this.transactionTemplate?

Yes, and this.transactionTemplate is create by KafkaAwareTransactionManager, maybe we need ensure DefaultAfterRolllbackProcessor and KafkaMessageContainer use the same KafkaAwareTransactionManager?

@l0co
Copy link
Author

l0co commented Mar 6, 2024

I'm not so deeply currently in this config, but I'd just use existing kafkaTemplate as I described above. But you might be right as well

@sobychacko sobychacko added this to the 3.2.0-RC1 milestone Mar 21, 2024
@sobychacko sobychacko removed this from the 3.2.0-RC1 milestone Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants