Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-data-redis")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0")
implementation("org.springframework.retry:spring-retry")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0")
implementation("org.redisson:redisson-spring-boot-starter:3.37.0")

compileOnly("org.projectlombok:lombok")
Expand Down
91 changes: 91 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
version: '3.7'

services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-2:
image: confluentinc/cp-zookeeper:latest
ports:
- "2182:2182"
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888

zookeeper-3:
image: confluentinc/cp-zookeeper:latest
ports:
- "2183:2183"
environment:
ZOOKEEPER_CLIENT_PORT: 2183
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zoozookeeper-2:2888:3888;zookeeper-3:2888:3888

kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183"
KAFKA_BROKER_ID: 1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3

kafka-2:
image: confluentinc/cp-kafka:latest
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183"
KAFKA_BROKER_ID: 2
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3

kafka-3:
image: confluentinc/cp-kafka:latest
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183"
KAFKA_BROKER_ID: 3
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- 8081:8080
environment:
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_NAME=peters_kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:19092
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.example.hhplus.concert.infra.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.hhplus.concert.infra.kafka;

import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@EnableKafka
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

카프카 config를 producer, consumer 두개로 나누다보니 @EnableKafka 어노테이션의 위치를 고민하다가 두파일 모두 작성했는데 다른 분들 의견도 궁금합니다.

@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {

private final KafkaProperties kafkaProperties;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.example.hhplus.concert.interfaces.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumer {

private String message = null;

@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
log.info("Consumed message: {}", message);
this.message = message;
}

public String getMessage() {
return message;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.example.hhplus.concert.interfaces.consumer;

import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

private final KafkaProperties kafkaProperties;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

return factory;
}
}
13 changes: 13 additions & 0 deletions src/main/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ spring:
redis:
host: localhost
port: 6379
kafka:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

queue:
waiting-key: concertWaitingQueue
Expand Down
17 changes: 15 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,23 @@ spring:
port: ${REDIS_PORT}
cache:
type: redis
kafka:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

queue:
waiting-key: ${QUEUE_WAITING_KEY}
active-key: ${QUEUE_ACTIVE_KEY}
waiting-key: ${QUEUE_WAITING_KEY:waiting}
active-key: ${QUEUE_ACTIVE_KEY:active}

# Swagger springdoc-ui Configuration
springdoc:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.hhplus.concert.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import com.example.hhplus.concert.infra.kafka.KafkaProducer;
import com.example.hhplus.concert.interfaces.consumer.KafkaConsumer;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

@SpringBootTest
@ActiveProfiles("test")
@DisplayName("카프카 통합 테스트")
class KafkaIntegrationTest {

@Autowired
private KafkaProducer producer;

@Autowired
private KafkaConsumer kafkaConsumer;

@Test
void test() throws Exception {
// given
String topic = "test-topic";
String message = "Hello Kafka";

// when
producer.sendMessage(topic, message);
Thread.sleep(3000);

// then
assertThat(kafkaConsumer.getMessage()).isEqualTo(message);
}

}