diff --git a/build.gradle.kts b/build.gradle.kts index 07ba46d..983b0ac 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -27,8 +27,9 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-data-jpa") implementation("org.springframework.boot:spring-boot-starter-data-redis") implementation("org.springframework.boot:spring-boot-starter-web") - implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0") implementation("org.springframework.retry:spring-retry") + implementation("org.springframework.kafka:spring-kafka") + implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0") implementation("org.redisson:redisson-spring-boot-starter:3.37.0") compileOnly("org.projectlombok:lombok") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..39fb2fb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,91 @@ +version: '3.7' + +services: + zookeeper-1: + image: confluentinc/cp-zookeeper:latest + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 + + zookeeper-2: + image: confluentinc/cp-zookeeper:latest + ports: + - "2182:2182" + environment: + ZOOKEEPER_CLIENT_PORT: 2182 + ZOOKEEPER_SERVER_ID: 2 + ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888 + + zookeeper-3: + image: confluentinc/cp-zookeeper:latest + ports: + - "2183:2183" + environment: + ZOOKEEPER_CLIENT_PORT: 2183 + ZOOKEEPER_SERVER_ID: 3 + ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zoozookeeper-2:2888:3888;zookeeper-3:2888:3888 + + kafka-1: + image: confluentinc/cp-kafka:latest + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183" + KAFKA_BROKER_ID: 1 + depends_on: + - zookeeper-1 + - zookeeper-2 + - zookeeper-3 + + kafka-2: + image: confluentinc/cp-kafka:latest + ports: + - "9093:9093" + - "29093:29093" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183" + KAFKA_BROKER_ID: 2 + depends_on: + - zookeeper-1 + - zookeeper-2 + - zookeeper-3 + + kafka-3: + image: confluentinc/cp-kafka:latest + ports: + - "9094:9094" + - "29094:29094" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2183" + KAFKA_BROKER_ID: 3 + depends_on: + - zookeeper-1 + - zookeeper-2 + - zookeeper-3 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: kafka-ui + depends_on: + - kafka-1 + - kafka-2 + - kafka-3 + ports: + - 8081:8080 + environment: + - DYNAMIC_CONFIG_ENABLED=true + - KAFKA_CLUSTERS_0_NAME=peters_kafka + - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:19092 \ No newline at end of file diff --git a/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducer.java b/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducer.java new file mode 100644 index 0000000..6ed9641 --- /dev/null +++ b/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducer.java @@ -0,0 +1,16 @@ +package com.example.hhplus.concert.infra.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, String message) { + kafkaTemplate.send(topic, message); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducerConfig.java b/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..04faf05 --- /dev/null +++ b/src/main/java/com/example/hhplus/concert/infra/kafka/KafkaProducerConfig.java @@ -0,0 +1,38 @@ +package com.example.hhplus.concert.infra.kafka; + +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@EnableKafka +@Configuration +@RequiredArgsConstructor +public class KafkaProducerConfig { + + private final KafkaProperties kafkaProperties; + + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new DefaultKafkaProducerFactory<>(config); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + +} diff --git a/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumer.java b/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumer.java new file mode 100644 index 0000000..1ece567 --- /dev/null +++ b/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumer.java @@ -0,0 +1,23 @@ +package com.example.hhplus.concert.interfaces.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class KafkaConsumer { + + private String message = null; + + @KafkaListener(topics = "test-topic", groupId = "test-group") + public void consume(String message) { + log.info("Consumed message: {}", message); + this.message = message; + } + + public String getMessage() { + return message; + } + +} \ No newline at end of file diff --git a/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumerConfig.java b/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumerConfig.java new file mode 100644 index 0000000..4aaf9e9 --- /dev/null +++ b/src/main/java/com/example/hhplus/concert/interfaces/consumer/KafkaConsumerConfig.java @@ -0,0 +1,41 @@ +package com.example.hhplus.concert.interfaces.consumer; + +import java.util.HashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +@EnableKafka +@Configuration +@RequiredArgsConstructor +public class KafkaConsumerConfig { + + private final KafkaProperties kafkaProperties; + + @Bean + public ConsumerFactory consumerFactory() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + + return factory; + } +} \ No newline at end of file diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index c732a58..85a9a74 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -24,6 +24,19 @@ spring: redis: host: localhost port: 6379 + kafka: + bootstrap-servers: + - localhost:9092 + - localhost:9093 + - localhost:9094 + consumer: + group-id: test-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer queue: waiting-key: concertWaitingQueue diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 6f87c95..3e22e25 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -23,10 +23,23 @@ spring: port: ${REDIS_PORT} cache: type: redis + kafka: + bootstrap-servers: + - localhost:9092 + - localhost:9093 + - localhost:9094 + consumer: + group-id: test-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer queue: - waiting-key: ${QUEUE_WAITING_KEY} - active-key: ${QUEUE_ACTIVE_KEY} + waiting-key: ${QUEUE_WAITING_KEY:waiting} + active-key: ${QUEUE_ACTIVE_KEY:active} # Swagger springdoc-ui Configuration springdoc: diff --git a/src/test/java/com/example/hhplus/concert/kafka/KafkaIntegrationTest.java b/src/test/java/com/example/hhplus/concert/kafka/KafkaIntegrationTest.java new file mode 100644 index 0000000..2a96538 --- /dev/null +++ b/src/test/java/com/example/hhplus/concert/kafka/KafkaIntegrationTest.java @@ -0,0 +1,38 @@ +package com.example.hhplus.concert.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.example.hhplus.concert.infra.kafka.KafkaProducer; +import com.example.hhplus.concert.interfaces.consumer.KafkaConsumer; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; + +@SpringBootTest +@ActiveProfiles("test") +@DisplayName("카프카 통합 테스트") +class KafkaIntegrationTest { + + @Autowired + private KafkaProducer producer; + + @Autowired + private KafkaConsumer kafkaConsumer; + + @Test + void test() throws Exception { + // given + String topic = "test-topic"; + String message = "Hello Kafka"; + + // when + producer.sendMessage(topic, message); + Thread.sleep(3000); + + // then + assertThat(kafkaConsumer.getMessage()).isEqualTo(message); + } + +}