Skip to content

Commit 4c68195

Browse files
committed
✨ spring-boot-demo-mq-kafka 完成
1 parent 5394cfd commit 4c68195

File tree

1 file changed

+265
-0
lines changed

1 file changed

+265
-0
lines changed

spring-boot-demo-mq-kafka/README.md

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
# spring-boot-demo-mq-kafka
2+
3+
> 本 demo 主要演示了 Spring Boot 如何集成 kafka,实现消息的发送和接收。
4+
5+
## 环境准备
6+
7+
> 注意:本 demo 基于 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0
8+
9+
创建一个名为 `test` 的Topic
10+
11+
```bash
12+
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
13+
```
14+
15+
## pom.xml
16+
17+
```xml
18+
<?xml version="1.0" encoding="UTF-8"?>
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<artifactId>spring-boot-demo-mq-kafka</artifactId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
<packaging>jar</packaging>
26+
27+
<name>spring-boot-demo-mq-kafka</name>
28+
<description>Demo project for Spring Boot</description>
29+
30+
<parent>
31+
<groupId>com.xkcoding</groupId>
32+
<artifactId>spring-boot-demo</artifactId>
33+
<version>1.0.0-SNAPSHOT</version>
34+
</parent>
35+
36+
<properties>
37+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
38+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
39+
<java.version>1.8</java.version>
40+
</properties>
41+
42+
<dependencies>
43+
<dependency>
44+
<groupId>org.springframework.boot</groupId>
45+
<artifactId>spring-boot-starter</artifactId>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>org.springframework.kafka</groupId>
50+
<artifactId>spring-kafka</artifactId>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.springframework.boot</groupId>
55+
<artifactId>spring-boot-starter-test</artifactId>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.projectlombok</groupId>
61+
<artifactId>lombok</artifactId>
62+
<optional>true</optional>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>cn.hutool</groupId>
67+
<artifactId>hutool-all</artifactId>
68+
</dependency>
69+
70+
<dependency>
71+
<groupId>com.google.guava</groupId>
72+
<artifactId>guava</artifactId>
73+
</dependency>
74+
</dependencies>
75+
76+
<build>
77+
<finalName>spring-boot-demo-mq-kafka</finalName>
78+
<plugins>
79+
<plugin>
80+
<groupId>org.springframework.boot</groupId>
81+
<artifactId>spring-boot-maven-plugin</artifactId>
82+
</plugin>
83+
</plugins>
84+
</build>
85+
86+
</project>
87+
```
88+
89+
## application.yml
90+
91+
```yaml
92+
server:
93+
port: 8080
94+
servlet:
95+
context-path: /demo
96+
spring:
97+
kafka:
98+
bootstrap-servers: localhost:9092
99+
producer:
100+
retries: 0
101+
batch-size: 16384
102+
buffer-memory: 33554432
103+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
104+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
105+
consumer:
106+
group-id: spring-boot-demo
107+
# 手动提交
108+
enable-auto-commit: false
109+
auto-offset-reset: latest
110+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
111+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
112+
properties:
113+
session.timeout.ms: 60000
114+
listener:
115+
log-container-config: false
116+
concurrency: 5
117+
# 手动提交
118+
ack-mode: manual_immediate
119+
```
120+
121+
## KafkaConfig.java
122+
123+
```java
124+
/**
125+
* <p>
126+
* kafka配置类
127+
* </p>
128+
*
129+
* @package: com.xkcoding.mq.kafka.config
130+
* @description: kafka配置类
131+
* @author: yangkai.shen
132+
* @date: Created in 2019-01-07 14:49
133+
* @copyright: Copyright (c) 2019
134+
* @version: V1.0
135+
* @modified: yangkai.shen
136+
*/
137+
@Configuration
138+
@EnableConfigurationProperties({KafkaProperties.class})
139+
@EnableKafka
140+
@AllArgsConstructor
141+
public class KafkaConfig {
142+
private final KafkaProperties kafkaProperties;
143+
144+
@Bean
145+
public KafkaTemplate<String, String> kafkaTemplate() {
146+
return new KafkaTemplate<>(producerFactory());
147+
}
148+
149+
@Bean
150+
public ProducerFactory<String, String> producerFactory() {
151+
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
152+
}
153+
154+
@Bean
155+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
156+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
157+
factory.setConsumerFactory(consumerFactory());
158+
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
159+
factory.setBatchListener(true);
160+
factory.getContainerProperties().setPollTimeout(3000);
161+
return factory;
162+
}
163+
164+
@Bean
165+
public ConsumerFactory<String, String> consumerFactory() {
166+
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
167+
}
168+
169+
@Bean("ackContainerFactory")
170+
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
171+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
172+
factory.setConsumerFactory(consumerFactory());
173+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
174+
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
175+
return factory;
176+
}
177+
178+
}
179+
```
180+
181+
## MessageHandler.java
182+
183+
```java
184+
/**
185+
* <p>
186+
* 消息处理器
187+
* </p>
188+
*
189+
* @package: com.xkcoding.mq.kafka.handler
190+
* @description: 消息处理器
191+
* @author: yangkai.shen
192+
* @date: Created in 2019-01-07 14:58
193+
* @copyright: Copyright (c) 2019
194+
* @version: V1.0
195+
* @modified: yangkai.shen
196+
*/
197+
@Component
198+
@Slf4j
199+
public class MessageHandler {
200+
201+
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
202+
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
203+
try {
204+
String message = (String) record.value();
205+
log.info("收到消息: {}", message);
206+
} catch (Exception e) {
207+
log.error(e.getMessage(), e);
208+
} finally {
209+
// 手动提交 offset
210+
acknowledgment.acknowledge();
211+
}
212+
}
213+
}
214+
```
215+
216+
## SpringBootDemoMqKafkaApplicationTests.java
217+
218+
```java
219+
@RunWith(SpringRunner.class)
220+
@SpringBootTest
221+
public class SpringBootDemoMqKafkaApplicationTests {
222+
@Autowired
223+
private KafkaTemplate<String, String> kafkaTemplate;
224+
225+
/**
226+
* 测试发送消息
227+
*/
228+
@Test
229+
public void testSend() {
230+
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");
231+
}
232+
233+
}
234+
```
235+
236+
## 参考
237+
238+
1. Spring Boot 版本和 Spring-Kafka 的版本对应关系:https://spring.io/projects/spring-kafka
239+
240+
| Spring for Apache Kafka Version | Spring Integration for Apache Kafka Version | kafka-clients |
241+
| ------------------------------- | ------------------------------------------- | ------------------- |
242+
| 2.2.x | 3.1.x | 2.0.0, 2.1.0 |
243+
| 2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
244+
| 2.0.x | 3.0.x | 0.11.0.x, 1.0.x |
245+
| 1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
246+
| 1.2.x | 2.2.x | 0.10.2.x |
247+
| 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x |
248+
| 1.0.x | 2.0.x | 0.9.x.x |
249+
| N/A* | 1.3.x | 0.8.2.2 |
250+
251+
> **IMPORTANT:** This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x **(and all spring boot 1.5.x users)** are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to [KIP-62](https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread). For a complete discussion about client/broker compatibility, see the Kafka [Compatibility Matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix)
252+
>
253+
> - Spring Integration Kafka versions prior to 2.0 pre-dated the Spring for Apache Kafka project and therefore were not based on it.
254+
>
255+
> These versions will be referenced transitively when using maven or gradle for version management. For the 1.1.x version, the 0.10.1.x is the default.
256+
>
257+
> 2.1.x uses the 1.1.x kafka-clients by default. When overriding the kafka-clients for 2.1.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.1.x/reference/html/deps-for-11x.html).
258+
>
259+
> 2.2.x uses the 2.0.x kafka-clients by default. When overriding the kafka-clients for 2.2.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.2.1.BUILD-SNAPSHOT/reference/html/deps-for-21x.html).
260+
>
261+
> - Spring Boot 1.5 users should use 1.3.x (Boot dependency management will use 1.1.x by default so this should be overridden).
262+
> - Spring Boot 2.0 users should use 2.0.x (Boot dependency management will use the correct version).
263+
> - Spring Boot 2.1 users should use 2.2.x (Boot dependency management will use the correct version).
264+
265+
2. Spring-Kafka 官方文档:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/

0 commit comments

Comments
 (0)