Skip to content

Commit

Permalink
Merge branch 'main' into 241006-batch-attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove authored Oct 8, 2024
2 parents 6adf7f8 + 3b64536 commit 90bc27a
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 74 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ plugins {
id 'org.ajoberstar.grgit' version '5.2.2'
id 'io.spring.nohttp' version '0.0.11'
id 'io.spring.dependency-management' version '1.1.6' apply false
id 'com.github.spotbugs' version '6.0.23'
id 'com.github.spotbugs' version '6.0.24'
id 'io.freefair.aggregate-javadoc' version '8.6'
}

Expand Down Expand Up @@ -54,13 +54,13 @@ ext {
awaitilityVersion = '4.2.2'
hamcrestVersion = '2.2'
hibernateValidationVersion = '8.0.1.Final'
jacksonBomVersion = '2.17.2'
jacksonBomVersion = '2.18.0'
jaywayJsonPathVersion = '2.9.0'
junit4Version = '4.13.2'
junitJupiterVersion = '5.11.1'
junitJupiterVersion = '5.11.2'
kafkaVersion = '3.8.0'
kotlinCoroutinesVersion = '1.8.1'
log4jVersion = '2.24.0'
log4jVersion = '2.24.1'
micrometerDocsVersion = '1.0.4'
micrometerVersion = '1.14.0-SNAPSHOT'
micrometerTracingVersion = '1.4.0-SNAPSHOT'
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pluginManagement {
}

plugins {
id 'io.spring.develocity.conventions' version '0.0.21'
id 'io.spring.develocity.conventions' version '0.0.22'
}

rootProject.name = 'spring-kafka-dist'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ Also, a `StringOrBytesSerializer` is now available; it can serialize `byte[]`, `
See xref:kafka/serdes.adoc#messaging-message-conversion[Spring Messaging Message Conversion] for more information.

The `JsonSerializer`, `JsonDeserializer` and `JsonSerde` now have fluent APIs to make programmatic configuration simpler.
See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more informaion.
See the javadocs, xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion], and xref:streams.adoc#serde[Streams JSON Serialization and Deserialization] for more information.

[[cb-2-2-and-2-3-replyingkafkatemplate]]
=== ReplyingKafkaTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaLis
}
----

As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it.
As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalance` API on it.
When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer.
The Kafka consumer will trigger a rebalance as part of the next `poll()` operation.
The Kafka consumer will trigger a rebalance as part of the next `poll()` operation.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public KafkaListenerContainerFactory<?> batchFactory() {
}
----

NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
NOTE: Starting with version 2.8, you can override the factory's `batchListener` property using the `batch` property on the `@KafkaListener` annotation.
This, together with the changes to xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers] allows the same factory to be used for both record and batch listeners.

NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties.
Expand Down Expand Up @@ -404,7 +404,7 @@ public class Listener {
}
----

If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token byusing the `beanRef` attribute.
If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token by using the `beanRef` attribute.
The following example shows how to do so:

[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListe
----

IMPORTANT: Starting with version 2.4, a new method `onPartitionsLost()` has been added (similar to a method with the same name in `ConsumerRebalanceLister`).
The default implementation on `ConsumerRebalanceLister` simply calls `onPartionsRevoked`.
The default implementation on `ConsumerRebalanceLister` simply calls `onPartitionsRevoked`.
The default implementation on `ConsumerAwareRebalanceListener` does nothing.
When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call `onPartitionsRevoked` from `onPartitionsLost`.
If you implement `ConsumerRebalanceListener` you should override the default method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Example:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
void listen(...) {
...
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> conf
----

Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided.
In this case, if there are ambiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided.

[[by-topic]]
=== By Topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ Of course, the `recoverer()` bean can be your own implementation of `ConsumerRec
Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams.
Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application.
Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that.
To learn more about interacive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article].
To learn more about interactive queries, see this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html[article].
The support in Spring for Apache Kafka is centered around an API called `KafkaStreamsInteractiveQueryService` which is a facade around interactive queries APIs in Kafka Streams library.
An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.

Expand Down Expand Up @@ -376,7 +376,7 @@ Here is the type signature from the API.
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
----

When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.
When calling this method, the user can specifically ask for the proper state store type, as we have done in the above example.

=== Retrying State Store Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void sendToKafka(String in) {
[[tip-json]]
== Customizing the JsonSerializer and JsonDeserializer

The serializer and deserializer support a number of cusomizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information.
The serializer and deserializer support a number of customizations using properties, see xref:kafka/serdes.adoc#json-serde[JSON] for more information.
The `kafka-clients` code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories.
If you wish to configure the (de)serializer using properties, but wish to use, say, a custom `ObjectMapper`, simply create a subclass and pass the custom mapper into the `super` constructor. For example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section

When using `DeadLetterPublishingRecovererFactory`, the user applications can override the `maybeLogListenerException` method to customize the logging behavior.

[[x33-customize-admin-client-in-KafkaAdmin]]
=== Customize Admin client in KafkaAdmin

When extending `KafkaAdmin`, user applications may override the `createAdmin` method to customize Admin client creation.

[[x33-customize-kafka-streams-implementation]]
=== Customizing The Implementation of Kafka Streams

When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.


[[x33-kafka-headers-for-batch-listeners]]
=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners
When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,

this.configs = new ConcurrentHashMap<>(configs);
this.configureDeserializers = configureDeserializers;
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
}

private Supplier<Deserializer<K>> keyDeserializerSupplier(
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier) {

if (!this.configureDeserializers) {
return keyDeserializerSupplier;
}
return keyDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<K> deserializer = keyDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, true);
}
return deserializer;
};
}

private Supplier<Deserializer<V>> valueDeserializerSupplier(
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {

if (!this.configureDeserializers) {
return valueDeserializerSupplier;
}
return valueDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<V> deserializer = valueDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, false);
}
return deserializer;
};
this.keyDeserializerSupplier = keyDeserializerSupplier;
this.valueDeserializerSupplier = valueDeserializerSupplier;
}

@Override
Expand All @@ -219,7 +185,7 @@ public void setBeanName(String name) {
* @param keyDeserializer the deserializer.
*/
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
this.keyDeserializerSupplier = () -> keyDeserializer;
}

/**
Expand All @@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
* @param valueDeserializer the value deserializer.
*/
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
this.valueDeserializerSupplier = () -> valueDeserializer;
}

/**
Expand All @@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
* @since 2.8
*/
public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
this.keyDeserializerSupplier = keyDeserializerSupplier;
}

/**
Expand All @@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializer
* @since 2.8
*/
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
this.valueDeserializerSupplier = valueDeserializerSupplier;
}

/**
Expand Down Expand Up @@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
this.applicationContext = applicationContext;
}

@Nullable
private Deserializer<K> keyDeserializer(Map<String, Object> configs) {
Deserializer<K> deserializer =
this.keyDeserializerSupplier != null
? this.keyDeserializerSupplier.get()
: null;
if (deserializer != null && this.configureDeserializers) {
deserializer.configure(configs, true);
}
return deserializer;
}

@Nullable
private Deserializer<V> valueDeserializer(Map<String, Object> configs) {
Deserializer<V> deserializer =
this.valueDeserializerSupplier != null
? this.valueDeserializerSupplier.get()
: null;
if (deserializer != null && this.configureDeserializers) {
deserializer.configure(configs, false);
}
return deserializer;
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;

protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
super(configProps,
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());
super(configProps, keyDeserializer(configProps), valueDeserializer(configProps));

if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -68,14 +69,15 @@
import org.springframework.util.Assert;

/**
* An admin that delegates to an {@link AdminClient} to create topics defined
* An admin that delegates to an {@link Admin} to create topics defined
* in the application context.
*
* @author Gary Russell
* @author Artem Bilan
* @author Adrian Gygax
* @author Sanghyeok An
* @author Valentina Armenise
* @author Anders Swanson
*
* @since 1.3
*/
Expand Down Expand Up @@ -114,9 +116,9 @@ public class KafkaAdmin extends KafkaResourceFactory
private String clusterId;

/**
* Create an instance with an {@link AdminClient} based on the supplied
* Create an instance with an {@link Admin} based on the supplied
* configuration.
* @param config the configuration for the {@link AdminClient}.
* @param config the configuration for the {@link Admin}.
*/
public KafkaAdmin(Map<String, Object> config) {
this.configs = new HashMap<>(config);
Expand Down Expand Up @@ -251,7 +253,7 @@ public void afterSingletonsInstantiated() {
public final boolean initialize() {
Collection<NewTopic> newTopics = newTopics();
if (!newTopics.isEmpty()) {
AdminClient adminClient = null;
Admin adminClient = null;
try {
adminClient = createAdmin();
}
Expand Down Expand Up @@ -347,7 +349,7 @@ protected Collection<NewTopic> newTopics() {
@Nullable
public String clusterId() {
if (this.clusterId == null) {
try (AdminClient client = createAdmin()) {
try (Admin client = createAdmin()) {
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
if (this.clusterId == null) {
this.clusterId = "null";
Expand All @@ -365,14 +367,14 @@ public String clusterId() {

@Override
public void createOrModifyTopics(NewTopic... topics) {
try (AdminClient client = createAdmin()) {
try (Admin client = createAdmin()) {
addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
}
}

@Override
public Map<String, TopicDescription> describeTopics(String... topicNames) {
try (AdminClient admin = createAdmin()) {
try (Admin admin = createAdmin()) {
Map<String, TopicDescription> results = new HashMap<>();
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
try {
Expand All @@ -389,7 +391,13 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}
}

AdminClient createAdmin() {
/**
* Creates a new {@link Admin} client instance using the {@link AdminClient} class.
* @return the new {@link Admin} client instance.
* @since 3.3.0
* @see AdminClient#create(Map)
*/
protected Admin createAdmin() {
return AdminClient.create(getAdminConfig());
}

Expand All @@ -409,7 +417,7 @@ protected Map<String, Object> getAdminConfig() {
return configs2;
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
private void addOrModifyTopicsIfNeeded(Admin adminClient, Collection<NewTopic> topics) {
if (!topics.isEmpty()) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
Expand Down Expand Up @@ -439,7 +447,7 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
}

private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
AdminClient adminClient, Collection<NewTopic> topics) {
Admin adminClient, Collection<NewTopic> topics) {

List<ConfigResource> configResources = topics.stream()
.map(topic -> new ConfigResource(Type.TOPIC, topic.name()))
Expand Down Expand Up @@ -484,7 +492,7 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
}
}

private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics,
private void adjustConfigMismatches(Admin adminClient, Collection<NewTopic> topics,
Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs) {
for (Map.Entry<ConfigResource, List<ConfigEntry>> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) {
ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey();
Expand Down Expand Up @@ -556,7 +564,7 @@ else if (topic.numPartitions() > topicDescription.partitions().size()) {
return topicsToModify;
}

private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
private void addTopics(Admin adminClient, List<NewTopic> topicsToAdd) {
CreateTopicsResult topicResults = adminClient.createTopics(topicsToAdd);
try {
topicResults.all().get(this.operationTimeout, TimeUnit.SECONDS);
Expand All @@ -579,7 +587,7 @@ private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
}
}

private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
private void createMissingPartitions(Admin adminClient, Map<String, NewPartitions> topicsToModify) {
CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
try {
partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit 90bc27a

Please sign in to comment.