Skip to content

Commit 11bf13e

Browse files
committed
Refactor share consumer acknowledgment interfaces and improve code quality
- Simplify to single AcknowledgingShareConsumerAwareMessageListener interface with nullable acknowledgment - Remove redundant ShareConsumerAwareMessageListener interface - Clean up ShareAcknowledgmentMode enum by removing unnecessary string property - Replace verbose null checks with Boolean.TRUE.equals() pattern - Use enum values for acknowledgment mode validation instead of hardcoded strings - Update tests and documentation to use unified interface - Fix spacing issues in @nullable annotations - Fixing other formatting issues Improve share consumer polling behavior and add isShareConsumer() helper - Add isShareConsumer() helper method to AbstractKafkaListenerEndpoint for cleaner boolean checks - Replace verbose Boolean.TRUE.equals() pattern with cleaner isShareConsumer() call - Handle KIP-932 IllegalStateException when polling with unacknowledged records in explicit mode - Add minimal 10ms delay to prevent tight loop while maintaining responsiveness - Remove problematic pre-poll blocking logic that prevented proper exception handling The share consumer now properly handles the broker's IllegalStateException when attempting to poll with unacknowledged records, as specified in KIP-932. This maintains heartbeat while waiting for acknowledgments and prevents CPU-intensive tight loops. Fix thread safety in ShareConsumer acknowledgments ShareConsumer is not thread-safe and requires all access to happen on the consumer thread. The previous implementation allowed acknowledgment calls from listener threads to directly access the consumer, causing ConcurrentModificationException. Changes: - Add PendingAcknowledgment queue to safely pass acknowledgments between threads - Process queued acknowledgments on the consumer thread during poll loop - Remove direct consumer access from ShareConsumerAcknowledgment.acknowledgeInternal() - Add notifyAcknowledged() callback for acknowledgment completion This ensures all ShareConsumer interactions happen on the owning consumer thread, eliminating race conditions between polling and acknowledgment operations. The thread safety issue was exposed by removing the pre-poll sleep, which previously masked the concurrent access by creating timing windows where the consumer was dormant. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent 3840f3a commit 11bf13e

14 files changed

+470
-251
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 104 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,8 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws
231231

232232
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.
233233

234-
[[share-acknowledgment-modes]]
235-
=== Acknowledgment Modes
236-
237-
Share containers support two acknowledgment modes:
238234
[[share-implicit-acknowledgment]]
239-
240-
==== Implicit Acknowledgment (Default)
235+
=== Implicit Acknowledgment (Default)
241236
In implicit mode, records are automatically acknowledged based on processing outcome:
242237

243238
Successful processing: Records are acknowledged as `ACCEPT`
@@ -259,9 +254,13 @@ public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerCont
259254
----
260255

261256
[[share-explicit-acknowledgment]]
262-
==== Explicit Acknowledgment
257+
=== Explicit Acknowledgment
258+
259+
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment.
263260

264-
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment:
261+
There are two ways to configure explicit acknowledgment mode:
262+
263+
==== Option 1: Using Kafka Client Configuration
265264

266265
[source,java]
267266
----
@@ -271,25 +270,46 @@ public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
271270
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
272271
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
273272
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
274-
props.put("share.acknowledgement.mode", "explicit");
273+
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
275274
return new DefaultShareConsumerFactory<>(props);
276275
}
276+
----
277+
278+
==== Option 2: Using Spring Container Configuration
277279

280+
[source,java]
281+
----
278282
@Bean
279283
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
280-
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
281-
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
284+
ShareConsumerFactory<String, String> shareConsumerFactory) {
285+
286+
ShareKafkaListenerContainerFactory<String, String> factory =
287+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
288+
289+
// Configure acknowledgment mode at container factory level
290+
factory.getContainerProperties()
291+
.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT);
292+
293+
return factory;
282294
}
283295
----
284296

297+
==== Configuration Precedence
298+
299+
When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):
300+
301+
1. **Container Properties**: `containerProperties.setShareAcknowledgmentMode()`
302+
2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG`
303+
3. **Default**: `ShareAcknowledgmentMode.IMPLICIT`
304+
285305
[[share-acknowledgment-types]]
286306
=== Acknowledgment Types
287307

288308
Share consumers support three acknowledgment types:
289309

290-
`ACCEPT`: Record processed successfully, mark as completed
291-
`RELEASE`: Temporary failure, make record available for redelivery
292-
`REJECT`: Permanent failure, do not retry
310+
ACCEPT: Record processed successfully, mark as completed
311+
RELEASE: Temporary failure, make record available for redelivery
312+
REJECT: Permanent failure, do not retry
293313

294314
[[share-acknowledgment-api]]
295315
=== ShareAcknowledgment API
@@ -326,46 +346,60 @@ public void listen(ConsumerRecord<String, String> record) {
326346
}
327347
----
328348

329-
[[share-consumer-aware-listener]]
330-
==== ShareConsumerAwareMessageListener
349+
[[share-acknowledging-listener]]
350+
==== AcknowledgingShareConsumerAwareMessageListener
351+
352+
This interface provides access to the ShareConsumer instance with optional acknowledgment support.
353+
The acknowledgment parameter is nullable and depends on the container's acknowledgment mode:
331354

332-
Access the ShareConsumer instance for advanced operations:
355+
===== Implicit Mode Example (acknowledgment is null)
333356

334357
[source,java]
335358
----
336-
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
337-
public void listen(ConsumerRecord<String, String> record, ShareConsumer<?, ?> consumer) {
359+
@KafkaListener(
360+
topics = "my-topic",
361+
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
362+
)
363+
public void listen(ConsumerRecord<String, String> record,
364+
@Nullable ShareAcknowledgment acknowledgment,
365+
ShareConsumer<?, ?> consumer) {
366+
367+
// In implicit mode, acknowledgment is null
338368
System.out.println("Received: " + record.value());
339-
// Access consumer metrics, etc.
369+
370+
// Access consumer metrics if needed
371+
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
372+
373+
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
340374
}
341375
----
342376

343-
[[share-acknowledging-listener]]
344-
==== AcknowledgingShareConsumerAwareMessageListener
345-
346-
Use explicit acknowledgment with full consumer access:
377+
===== Explicit Mode Example (acknowledgment is non-null)
347378

348379
[source,java]
349380
----
350381
@Component
351382
public class ExplicitAckListener {
352-
@KafkaListener(
353-
topics = "my-topic",
354-
containerFactory = "explicitShareKafkaListenerContainerFactory"
355-
)
356-
public void listen(ConsumerRecord<String, String> record,
357-
ShareAcknowledgment acknowledgment,
358-
ShareConsumer<?, ?> consumer) {
383+
@KafkaListener(
384+
topics = "my-topic",
385+
containerFactory = "explicitShareKafkaListenerContainerFactory"
386+
)
387+
public void listen(ConsumerRecord<String, String> record,
388+
@Nullable ShareAcknowledgment acknowledgment,
389+
ShareConsumer<?, ?> consumer) {
359390
360-
try {
361-
processRecord(record);
362-
acknowledgment.acknowledge(); // ACCEPT
363-
} catch (RetryableException e) {
364-
acknowledgment.release(); // Will be redelivered
365-
} catch (Exception e) {
366-
acknowledgment.reject(); // Permanent failure
391+
// In explicit mode, acknowledgment is non-null
392+
try {
393+
processRecord(record);
394+
acknowledgment.acknowledge(); // ACCEPT
395+
}
396+
catch (RetryableException e) {
397+
acknowledgment.release(); // Will be redelivered
398+
}
399+
catch (Exception e) {
400+
acknowledgment.reject(); // Permanent failure
401+
}
367402
}
368-
}
369403
370404
private void processRecord(ConsumerRecord<String, String> record) {
371405
// Business logic here
@@ -386,6 +420,37 @@ Error Handling: If processing throws an exception, the record is automatically a
386420
In explicit mode, failing to acknowledge records will block further message processing.
387421
Always ensure records are acknowledged in all code paths.
388422

423+
[[share-acknowledgment-timeout]]
424+
==== Acknowledgment Timeout Detection
425+
426+
To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection.
427+
When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.
428+
429+
[source,java]
430+
----
431+
@Bean
432+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
433+
ShareConsumerFactory<String, String> shareConsumerFactory) {
434+
ShareKafkaListenerContainerFactory<String, String> factory =
435+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
436+
437+
// Set acknowledgment timeout (default is 60 seconds)
438+
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
439+
440+
return factory;
441+
}
442+
----
443+
444+
When a record exceeds the timeout, you'll see a warning like:
445+
----
446+
WARN: Record not acknowledged within timeout (30 seconds).
447+
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
448+
or ack.reject() for every record.
449+
Unacknowledged record: topic='my-topic', partition=0, offset=123
450+
----
451+
452+
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
453+
389454
[[share-acknowledgment-examples]]
390455
=== Acknowledgment Examples
391456

@@ -406,7 +471,6 @@ Always ensure records are acknowledged in all code paths.
406471
else {
407472
acknowledgment.release(); // Temporary failure - retry later
408473
}
409-
}
410474
else {
411475
acknowledgment.reject(); // Invalid order - don't retry
412476
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9898

9999
private @Nullable Boolean batchListener;
100100

101-
private @Nullable Boolean shareConsumer;
101+
private @Nullable Boolean shareConsumer;
102102

103103
private @Nullable KafkaTemplate<?, ?> replyTemplate;
104104

@@ -297,8 +297,13 @@ public void setShareConsumer(Boolean shareConsumer) {
297297
this.shareConsumer = shareConsumer;
298298
}
299299

300-
public @Nullable Boolean getShareConsumer() {
301-
return this.shareConsumer;
300+
/**
301+
* Return true if this endpoint is for a share consumer.
302+
* @return true for a share consumer endpoint.
303+
* @since 4.0
304+
*/
305+
public boolean isShareConsumer() {
306+
return this.shareConsumer != null && this.shareConsumer;
302307
}
303308

304309
/**

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
211211
@Nullable MessageConverter messageConverter) {
212212

213213
MessagingMessageListenerAdapter<K, V> listener;
214-
if (getShareConsumer() != null && getShareConsumer()) {
214+
if (isShareConsumer()) {
215215
ShareRecordMessagingMessageListenerAdapter<K, V> messageListener = new ShareRecordMessagingMessageListenerAdapter<>(
216216
this.bean, this.method, this.errorHandler);
217217
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.regex.Pattern;
2222

23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2324
import org.jspecify.annotations.Nullable;
2425

2526
import org.springframework.context.ApplicationContext;
@@ -129,38 +130,85 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
129130
// Validate share group configuration
130131
validateShareConfiguration(endpoint);
131132

132-
Object o = this.shareConsumerFactory.getConfigurationProperties().get("share.acknowledgement.mode");
133-
String explicitAck = null;
134-
if (o != null) {
135-
explicitAck = (String) o;
136-
}
133+
// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
134+
ContainerProperties.ShareAcknowledgmentMode ackMode = determineAcknowledgmentMode(properties);
135+
properties.setShareAcknowledgmentMode(ackMode);
136+
137137
JavaUtils.INSTANCE
138138
.acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup)
139139
.acceptIfNotNull(this.phase, instance::setPhase)
140140
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
141141
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
142142
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
143-
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId)
144-
.acceptIfCondition(explicitAck != null && explicitAck.equals("explicit"),
145-
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT,
146-
properties::setShareAcknowledgmentMode);
143+
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId);
144+
}
145+
146+
/**
147+
* Determine the acknowledgment mode following Spring Kafka's configuration precedence patterns.
148+
* <p>
149+
* Configuration precedence (highest to lowest):
150+
* <ol>
151+
* <li>Container Properties: {@code containerProperties.getShareAcknowledgmentMode()} (if explicitly set)</li>
152+
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
153+
* <li>Default: {@code ShareAcknowledgmentMode.IMPLICIT}</li>
154+
* </ol>
155+
*
156+
* @param containerProperties the container properties to check
157+
* @return the resolved acknowledgment mode
158+
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
159+
*/
160+
private ContainerProperties.ShareAcknowledgmentMode determineAcknowledgmentMode(ContainerProperties containerProperties) {
161+
// 1. Check if explicitly set at container level (highest priority)
162+
// Note: We need to check if it was explicitly set vs using the default
163+
// For now, we assume if it's not the default, it was explicitly set
164+
ContainerProperties.ShareAcknowledgmentMode containerMode = containerProperties.getShareAcknowledgmentMode();
165+
if (containerMode != ContainerProperties.ShareAcknowledgmentMode.IMPLICIT) {
166+
// Container level setting takes precedence
167+
return containerMode;
168+
}
169+
170+
// 2. Check Kafka client configuration (middle priority)
171+
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
172+
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
173+
174+
if (clientAckMode != null) {
175+
String mode = clientAckMode.toString();
176+
if ("explicit".equals(mode)) {
177+
return ContainerProperties.ShareAcknowledgmentMode.EXPLICIT;
178+
}
179+
else if ("implicit".equals(mode)) {
180+
return ContainerProperties.ShareAcknowledgmentMode.IMPLICIT;
181+
}
182+
else {
183+
throw new IllegalArgumentException(
184+
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + mode +
185+
". Must be 'implicit' or 'explicit'");
186+
}
187+
}
188+
// 3. Default (lowest priority)
189+
return ContainerProperties.ShareAcknowledgmentMode.IMPLICIT;
147190
}
148191

149192
private void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
150193
// Validate that batch listeners aren't used with share consumers
151-
if (endpoint.getBatchListener() != null && endpoint.getBatchListener()) {
194+
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {
152195
throw new IllegalArgumentException(
153196
"Batch listeners are not supported with share consumers. " +
154197
"Share groups operate at the record level.");
155198
}
156199

157-
// Validate acknowledgment mode consistency
200+
// Validate acknowledgment mode consistency using official Kafka client configuration
158201
Object ackMode = this.shareConsumerFactory.getConfigurationProperties()
159-
.get("share.acknowledgement.mode");
160-
if (ackMode != null && !Arrays.asList("implicit", "explicit").contains(ackMode)) {
161-
throw new IllegalArgumentException(
162-
"Invalid share.acknowledgement.mode: " + ackMode +
202+
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
203+
if (ackMode != null) {
204+
String ackModeStr = ackMode.toString().toLowerCase();
205+
boolean isValid = Arrays.stream(ContainerProperties.ShareAcknowledgmentMode.values())
206+
.anyMatch(mode -> mode.name().toLowerCase().equals(ackModeStr));
207+
if (!isValid) {
208+
throw new IllegalArgumentException(
209+
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + ackMode +
163210
". Must be 'implicit' or 'explicit'");
211+
}
164212
}
165213
}
166214

0 commit comments

Comments
 (0)