Skip to content

Commit 97003ed

Browse files
garyrussellartembilan
authored andcommitted
Doc: Add Tx Synchronization Example
1 parent db8a86f commit 97003ed

File tree

4 files changed

+127
-6
lines changed

4 files changed

+127
-6
lines changed

src/reference/asciidoc/index.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ include::streams.adoc[]
4343

4444
include::testing.adoc[]
4545

46-
== Tips and Tricks
46+
[[tips-n-tricks]]
47+
== Tips, Tricks and Examples
4748

4849
include::tips.adoc[]
4950

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,10 @@ KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
385385

386386
NOTE: The offset to be committed is one greater than the offset of the records processed by the listener.
387387

388-
IMPORTANT: You should call this should only when you use transaction synchronization.
389-
When a listener container is configured to use a `KafkaTransactionManager`, it takes care of sending the offsets to the transaction.
388+
IMPORTANT: You should call this only when you use transaction synchronization.
389+
When a listener container is configured to use a `KafkaTransactionManager` or `ChainedKafkaTransactionManager`, it takes care of sending the offsets to the transaction.
390+
391+
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
390392

391393
[[chained-transaction-manager]]
392394
====== Using `ChainedKafkaTransactionManager`
@@ -397,6 +399,8 @@ Since it is a `KafkaAwareTransactionManager`, the container can send the offsets
397399
This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code.
398400
You should chain your transaction managers in the desired order and provide the `ChainedTransactionManager` in the `ContainerProperties`.
399401

402+
See <<ex-jdbc-sync>> for an example application that synchronizes JDBC and Kafka transactions.
403+
400404
====== `KafkaTemplate` Local Transactions
401405

402406
You can use the `KafkaTemplate` to execute a series of operations within a local transaction.
@@ -1024,7 +1028,7 @@ public void listen(ConsumerRecord<?, ?> record) {
10241028

10251029
You can specify each partition in the `partitions` or `partitionOffsets` attribute but not both.
10261030

1027-
As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see <<assign-all-parts>>.
1031+
As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see <<tip-assign-all-parts>>.
10281032

10291033
When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
10301034
The following example also shows how to use a different container factory.
@@ -2892,7 +2896,7 @@ static class MultiListenerBean {
28922896

28932897
Note that the argument is `null`, not `KafkaNull`.
28942898

2895-
TIP: See <<assign-all-parts>>.
2899+
TIP: See <<tip-assign-all-parts>>.
28962900

28972901
[[annotation-error-handling]]
28982902
==== Handling Exceptions

src/reference/asciidoc/tips.adoc

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[[assign-all-parts]]
1+
[[tip-assign-all-parts]]
22
=== Manually Assigning All Partitions
33

44
Let's say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka's group management.
@@ -43,3 +43,114 @@ public static class PartitionFinder {
4343

4444
Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started.
4545
You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group.
46+
47+
[[ex-jdbc-sync]]
48+
=== Example of Transaction Synchronization
49+
50+
The following Spring Boot application is an example of synchronizing database and Kafka transactions.
51+
52+
====
53+
[source, java]
54+
----
55+
@SpringBootApplication
56+
public class Application {
57+
58+
public static void main(String[] args) {
59+
SpringApplication.run(Application.class, args);
60+
}
61+
62+
@Bean
63+
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
64+
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
65+
}
66+
67+
@Bean
68+
public ChainedKafkaTransactionManager<Object, Object> chainedTm(
69+
KafkaTransactionManager<String, String> ktm,
70+
DataSourceTransactionManager dstm) {
71+
72+
return new ChainedKafkaTransactionManager<>(ktm, dstm);
73+
}
74+
75+
@Bean
76+
public DataSourceTransactionManager dstm(DataSource dataSource) {
77+
return new DataSourceTransactionManager(dataSource);
78+
}
79+
80+
@Bean
81+
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
82+
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
83+
ConsumerFactory<Object, Object> kafkaConsumerFactory,
84+
ChainedKafkaTransactionManager<Object, Object> chainedTM) {
85+
86+
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
87+
new ConcurrentKafkaListenerContainerFactory<>();
88+
configurer.configure(factory, kafkaConsumerFactory);
89+
factory.getContainerProperties().setTransactionManager(chainedTM);
90+
return factory;
91+
}
92+
93+
@Component
94+
public static class Listener {
95+
96+
private final JdbcTemplate jdbcTemplate;
97+
98+
private final KafkaTemplate<String, String> kafkaTemplate;
99+
100+
public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
101+
this.jdbcTemplate = jdbcTemplate;
102+
this.kafkaTemplate = kafkaTemplate;
103+
}
104+
105+
@KafkaListener(id = "group1", topics = "topic1")
106+
public void listen1(String in) {
107+
this.kafkaTemplate.send("topic2", in.toUpperCase());
108+
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
109+
}
110+
111+
@KafkaListener(id = "group2", topics = "topic2")
112+
public void listen2(String in) {
113+
System.out.println(in);
114+
}
115+
116+
}
117+
118+
@Bean
119+
public NewTopic topic1() {
120+
return TopicBuilder.name("topic1").build();
121+
}
122+
123+
@Bean
124+
public NewTopic topic2() {
125+
return TopicBuilder.name("topic2").build();
126+
}
127+
128+
}
129+
----
130+
====
131+
132+
====
133+
[source, properties]
134+
----
135+
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
136+
spring.datasource.username=root
137+
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
138+
139+
spring.kafka.consumer.auto-offset-reset=earliest
140+
spring.kafka.consumer.enable-auto-commit=false
141+
spring.kafka.consumer.properties.isolation.level=read_committed
142+
143+
spring.kafka.producer.transaction-id-prefix=tx-
144+
145+
#logging.level.org.springframework.transaction=trace
146+
#logging.level.org.springframework.kafka.transaction=debug
147+
#logging.level.org.springframework.jdbc=debug
148+
----
149+
====
150+
151+
====
152+
[source, sql]
153+
----
154+
create table mytable (data varchar(20));
155+
----
156+
====

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ This section covers the changes made from version 2.2 to version 2.3.
44

55
Also see <<new-in-sik>>.
66

7+
==== Tips, Tricks and Examples
8+
9+
A new chapter <<tips-n-tricks>> has been added.
10+
Please submit GitHub issues and/or pull requests for additional entries in that chapter.
11+
712
[[kafka-client-2.2]]
813
==== Kafka Client Version
914

0 commit comments

Comments
 (0)