diff --git a/src/main/java/net/dancier/dancer/messaging/SendMessagesJob.java b/src/main/java/net/dancier/dancer/messaging/SendMessagesJob.java new file mode 100644 index 0000000..4fdf97c --- /dev/null +++ b/src/main/java/net/dancier/dancer/messaging/SendMessagesJob.java @@ -0,0 +1,53 @@ +package net.dancier.dancer.messaging; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import jakarta.transaction.Transactional; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.net.URI; +import java.util.Collection; + +@RequiredArgsConstructor +@Component +public class SendMessagesJob { + + private final static Logger log = LoggerFactory.getLogger(SendMessagesJob.class); + + private final OutboxJpaRepository outboxJpaRepository; + + private final KafkaTemplate kafkaTemplate; + + private final ObjectMapper objectMapper; + + @Transactional + @Scheduled(fixedRate = 2000) + public void sendMessages() throws JsonProcessingException { + Collection itemsToSend = outboxJpaRepository.lockAndList(); + for (OutboxJpaEntity item: itemsToSend) { + log.info("Sending: {}", item); + send(item); + item.setStatus(OutboxJpaEntity.STATUS.DONE); + } + kafkaTemplate.flush(); + } + + private void send(OutboxJpaEntity item) throws JsonProcessingException { + CloudEvent cloudEvent = CloudEventBuilder.v1() + .withId(item.getId().toString()) + .withSource(URI.create(item.getSource())) + .withType(item.getType()) + .withData(objectMapper.writeValueAsBytes(item.getData())) + .build(); + + kafkaTemplate.send(item.getType(), item.getKey(), cloudEvent); + + } +} diff --git a/src/main/java/net/dancier/dancer/messaging/TopicConfiguration.java b/src/main/java/net/dancier/dancer/messaging/TopicConfiguration.java index 3b09f7a..6a5ab61 100644 --- a/src/main/java/net/dancier/dancer/messaging/TopicConfiguration.java +++ b/src/main/java/net/dancier/dancer/messaging/TopicConfiguration.java @@ -1,22 +1,16 @@ package net.dancier.dancer.messaging; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; -import java.util.HashMap; -import java.util.Map; - @Configuration public class TopicConfiguration { @Bean - public NewTopic profileUpdated() { + public KafkaAdmin.NewTopics createTopics() { + return new NewTopic("profile-updated", 1, (short) 1); }