Skip to content

Commit 85cd556

Browse files
authored
Add concurrency support to ShareKafkaMessageListenerContainer
Share consumers (KIP-932) enable record-level load balancing where multiple consumers can cooperatively process from the same partitions. Unlike traditional consumer groups with exclusive partition ownership, share groups distribute work at the broker level via the share group coordinator. This commit adds native concurrency support to the existing `ShareKafkaMessageListenerContainer` rather than creating a separate `ConcurrentShareKafkaMessageListenerContainer`. This design choice avoids the parent/child container complexity that exists in the regular consumer model, since share consumers fundamentally operate differently: - Work distribution happens at the broker level, not at the Spring layer - Multiple threads simply provide more capacity for the broker to distribute records across - No partition ownership model to coordinate between child containers This approach provides: - Simpler architecture with a single container managing multiple threads - No parent/child context propagation concerns - Better alignment with share consumer semantics (record-level vs partition-level distribution) - Increased throughput for high-volume workloads - Better resource utilization across consumer threads Users can configure concurrency at three levels: 1. Per-listener via `@KafkaListener(concurrency = N)` 2. Factory-level default via `factory.setConcurrency(N)` 3. Programmatically via `container.setConcurrency(N)` The feature works seamlessly with both implicit (auto-acknowledge) and explicit (manual acknowledge/release/reject) acknowledgment modes, with each consumer thread independently managing its own acknowledgments. * Address PR feedback on concurrency implementation - Use primitive int for concurrency in factory (consistent with phase field) - Remove unnecessary `getConcurrency()` getter (only used in trivial tests) - Use `HashMap` instead of `ConcurrentHashMap` in metrics() (already inside lock) - Use `CompletableFuture.allOf()` for cleaner shutdown coordination - Remove debug logging from tests (unnecessary noise in CI/CD) - Remove thread tracking from concurrency tests (over-complicates assertions) Clarify documentation based on KIP-932 specifications: - Add explicit note that concurrency is additive across application instances - Replace high-level distribution description with precise KIP-932 details - Document pull-based model, acquisition locks, and batch behavior - Explain `max.poll.records` as soft limit with complete batch preference - Set accurate expectations about broker-controlled record distribution Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent 1972769 commit 85cd556

File tree

5 files changed

+551
-24
lines changed

5 files changed

+551
-24
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 168 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
107107
[[share-kafka-message-listener-container]]
108108
=== ShareKafkaMessageListenerContainer
109109

110-
The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers:
110+
The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing:
111111

112112
[source,java]
113113
----
@@ -151,6 +151,172 @@ Share consumers do not support:
151151
* Manual offset management
152152
====
153153

154+
[[share-container-concurrency]]
155+
=== Concurrency
156+
157+
The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container.
158+
Each thread runs its own `ShareConsumer` instance that participates in the same share group.
159+
160+
Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker.
161+
This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.
162+
163+
[IMPORTANT]
164+
====
165+
**Concurrency is Additive Across Application Instances**
166+
167+
From the share group's perspective, each `ShareConsumer` instance is an independent member, regardless of where it runs.
168+
Setting `concurrency=3` in a single container creates 3 share group members.
169+
If you run multiple application instances with the same share group ID, all their consumer threads combine into one pool.
170+
171+
For example:
172+
* Application Instance 1: `concurrency=3` → 3 share group members
173+
* Application Instance 2: `concurrency=3` → 3 share group members
174+
* **Total**: 6 share group members available for the broker to distribute records to
175+
176+
This means setting `concurrency=5` in a single container is operationally equivalent to running 5 separate application instances with `concurrency=1` each (all using the same `group.id`).
177+
The Kafka broker treats all consumer instances equally and distributes records across the entire pool.
178+
====
179+
180+
==== Configuring Concurrency Programmatically
181+
182+
[source,java]
183+
----
184+
@Bean
185+
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
186+
ShareConsumerFactory<String, String> shareConsumerFactory) {
187+
188+
ContainerProperties containerProps = new ContainerProperties("my-topic");
189+
containerProps.setGroupId("my-share-group");
190+
191+
ShareKafkaMessageListenerContainer<String, String> container =
192+
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
193+
194+
// Set concurrency to create 5 consumer threads
195+
container.setConcurrency(5);
196+
197+
container.setupMessageListener(new MessageListener<String, String>() {
198+
@Override
199+
public void onMessage(ConsumerRecord<String, String> record) {
200+
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
201+
}
202+
});
203+
204+
return container;
205+
}
206+
----
207+
208+
==== Configuring Concurrency via Factory
209+
210+
You can set default concurrency at the factory level, which applies to all containers created by that factory:
211+
212+
[source,java]
213+
----
214+
@Bean
215+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
216+
ShareConsumerFactory<String, String> shareConsumerFactory) {
217+
218+
ShareKafkaListenerContainerFactory<String, String> factory =
219+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
220+
221+
// Set default concurrency for all containers created by this factory
222+
factory.setConcurrency(3);
223+
224+
return factory;
225+
}
226+
----
227+
228+
==== Per-Listener Concurrency
229+
230+
The concurrency setting can be overridden per listener using the `concurrency` attribute:
231+
232+
[source,java]
233+
----
234+
@Component
235+
public class ConcurrentShareListener {
236+
237+
@KafkaListener(
238+
topics = "high-throughput-topic",
239+
containerFactory = "shareKafkaListenerContainerFactory",
240+
groupId = "my-share-group",
241+
concurrency = "10" // Override factory default
242+
)
243+
public void listen(ConsumerRecord<String, String> record) {
244+
// This listener will use 10 consumer threads
245+
System.out.println("Processing: " + record.value());
246+
}
247+
}
248+
----
249+
250+
==== Concurrency Considerations
251+
252+
* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently
253+
* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.)
254+
* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()`
255+
* **Lifecycle**: All consumer threads start and stop together as a unit
256+
* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group
257+
* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads
258+
259+
==== Concurrency with Explicit Acknowledgment
260+
261+
Concurrency works seamlessly with explicit acknowledgment mode.
262+
Each consumer thread independently tracks and acknowledges its own records:
263+
264+
[source,java]
265+
----
266+
@KafkaListener(
267+
topics = "order-queue",
268+
containerFactory = "explicitShareKafkaListenerContainerFactory",
269+
groupId = "order-processors",
270+
concurrency = "5"
271+
)
272+
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
273+
try {
274+
// Process the order
275+
processOrderLogic(record.value());
276+
acknowledgment.acknowledge(); // ACCEPT
277+
}
278+
catch (RetryableException e) {
279+
acknowledgment.release(); // Will be redelivered
280+
}
281+
catch (Exception e) {
282+
acknowledgment.reject(); // Permanent failure
283+
}
284+
}
285+
----
286+
287+
[NOTE]
288+
====
289+
**Record Acquisition and Distribution Behavior:**
290+
291+
Share consumers use a pull-based model where each consumer thread calls `poll()` to fetch records from the broker.
292+
When a consumer polls, the broker's share-partition leader:
293+
294+
* Selects records in "Available" state
295+
* Moves them to "Acquired" state with a time-limited acquisition lock (default 30 seconds, configurable via `group.share.record.lock.duration.ms`)
296+
* Prefers to return complete record batches for efficiency
297+
* Applies `max.poll.records` as a soft limit, meaning complete record batches will be acquired even if it exceeds this value
298+
299+
While records are acquired by one consumer, they are not available to other consumers.
300+
When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer.
301+
302+
The broker limits the number of records that can be acquired per partition using `group.share.partition.max.record.locks`.
303+
Once this limit is reached, subsequent polls temporarily return no records until locks expire.
304+
305+
**Implications for Concurrency:**
306+
307+
* Each consumer thread independently polls and may acquire different numbers of records per poll
308+
* Record distribution across threads depends on polling timing and batch availability
309+
* Multiple threads increase the pool of consumers available to acquire records
310+
* With low message volume or single partitions, records may concentrate on fewer threads
311+
* For long-running workloads, distribution tends to be more even
312+
313+
**Configuration:**
314+
315+
* Each thread polls and processes records independently
316+
* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads)
317+
* Concurrency setting must be greater than 0 and cannot be changed while the container is running
318+
====
319+
154320
[[share-annotation-driven-listeners]]
155321
== Annotation-Driven Listeners
156322

@@ -520,8 +686,7 @@ Share consumers differ from regular consumers in several key ways:
520686
=== Current Limitations
521687

522688
* **In preview**: This feature is in preview mode and may change in future versions
523-
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
524689
* **No Message Converters**: Message converters are not yet supported for share consumers
525690
* **No Batch Listeners**: Batch processing is not supported with share consumers
526-
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
691+
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread
527692

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class ShareKafkaListenerContainerFactory<K, V>
6363

6464
private int phase = 0;
6565

66+
private int concurrency = 1;
67+
6668
@SuppressWarnings("NullAway.Init")
6769
private ApplicationEventPublisher applicationEventPublisher;
6870

@@ -98,6 +100,22 @@ public void setPhase(int phase) {
98100
this.phase = phase;
99101
}
100102

103+
/**
104+
* Set the concurrency for containers created by this factory.
105+
* <p>
106+
* This specifies the number of consumer threads to create within each container.
107+
* Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer}
108+
* instance and participates in the same share group. The Kafka broker distributes
109+
* records across all consumer instances, providing record-level load balancing.
110+
* <p>
111+
* This can be overridden per listener endpoint using the {@code concurrency}
112+
* attribute on {@code @KafkaListener}.
113+
* @param concurrency the number of consumer threads (must be greater than 0)
114+
*/
115+
public void setConcurrency(int concurrency) {
116+
this.concurrency = concurrency;
117+
}
118+
101119
@Override
102120
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
103121
this.applicationEventPublisher = applicationEventPublisher;
@@ -138,6 +156,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
138156
boolean explicitAck = determineExplicitAcknowledgment(properties);
139157
properties.setExplicitShareAcknowledgment(explicitAck);
140158

159+
// Set concurrency - endpoint setting takes precedence over factory setting
160+
Integer conc = endpoint.getConcurrency();
161+
if (conc != null) {
162+
instance.setConcurrency(conc);
163+
}
164+
else {
165+
instance.setConcurrency(this.concurrency);
166+
}
167+
141168
instance.setAutoStartup(effectiveAutoStartup);
142169
instance.setPhase(this.phase);
143170
instance.setApplicationContext(this.applicationContext);

0 commit comments

Comments
 (0)