Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Solution #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Backend/retry/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ services:
ports:
- 1025:1025
- 8025:8025

mongo-express:
container_name: mongoexpress
image: mongo-express
links:
- mongodb
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_URL: mongodb://mongodb:27017/db
35 changes: 35 additions & 0 deletions Backend/retry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.9.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,29 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@SpringBootApplication
@EnableAsync
@EnableSwagger2 //http://localhost:8080/swagger-ui.html
public class RetryApplication {

public static void main(String[] args) {
SpringApplication.run(RetryApplication.class, args);
}

@Bean
public Docket retryApi() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.any())
.paths(PathSelectors.any())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,68 @@
package com.staffinghub.coding.challenges.retry.configuration;

import com.staffinghub.coding.challenges.retry.core.entities.EmailNotificationMessage;
import com.staffinghub.coding.challenges.retry.core.inbound.NotificationHandler;
import com.staffinghub.coding.challenges.retry.core.logic.NotificationService;
import com.staffinghub.coding.challenges.retry.core.outbound.NotificationSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore;
import org.springframework.integration.mongodb.store.MongoDbChannelMessageStore;
import org.springframework.integration.store.MessageGroupQueue;
import org.springframework.messaging.PollableChannel;

import java.time.LocalDateTime;
import java.util.concurrent.Executors;

@Configuration
@Slf4j
public class GlobalBeanConfiguration {
private static final String GROUP_ID = "email-group";
private static final String COLLECTION_NAME = "message-store";
@Bean
public NotificationHandler notificationHandler(PollableChannel channel) {
return new NotificationService(channel);
}

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDbFactory MongoDbFactory) {
return new MongoDbChannelMessageStore(MongoDbFactory, COLLECTION_NAME);
}

@Bean
public ConfigurableMongoDbMessageStore configurableMongoDbMessageStore(MongoDbFactory MongoDbFactory) {
return new ConfigurableMongoDbMessageStore(MongoDbFactory, COLLECTION_NAME);
}

@Bean
public PollableChannel channel(MongoDbChannelMessageStore messageStore) {
MessageGroupQueue messageGroupQueue = new MessageGroupQueue(messageStore, GROUP_ID);
return new QueueChannel(messageGroupQueue);
}

@Bean
public NotificationHandler notificationHandler(NotificationSender notificationSender) {
return new NotificationService(notificationSender);
public IntegrationFlow integrationFlow(PollableChannel channel, ApplicationEventPublisher events) {
return IntegrationFlows
.from(channel)
.filter(
EmailNotificationMessage.class,
p -> p.getDueTimestamp().isBefore(LocalDateTime.now()),
e -> e.poller(Pollers
.fixedRate(7000, 5000)
.maxMessagesPerPoll(1)
.taskExecutor(Executors.newSingleThreadExecutor())))
.handle(message -> {
EmailNotificationMessage emailNotificationMessage = (EmailNotificationMessage)message.getPayload();
events.publishEvent(emailNotificationMessage);
log.info("Message pulled: {}", message.getPayload());
}
)
.get();
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package com.staffinghub.coding.challenges.retry.core.entities;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotBlank;
import java.io.Serializable;

@Data
public class EmailNotification {
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EmailNotification implements Serializable {

@NotBlank
private String recipient;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.staffinghub.coding.challenges.retry.core.entities;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* Message used for queueing incoming email notifications.
*/
@Data
@Builder
public class EmailNotificationMessage implements Serializable {

@Builder.Default
private LocalDateTime timestamp = LocalDateTime.now();

@Builder.Default
private LocalDateTime dueTimestamp = LocalDateTime.now();

@Builder.Default
private Integer retryAttempts = 0;

private EmailNotification emailNotification;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,10 @@

public interface NotificationHandler {

/**
* Submits an emailNotification {@link EmailNotification} onto the email notification queue to handle the submission of the given notification
* @param emailNotification {@link EmailNotification}
* @return {@link EmailNotification} The email notification that was submitted for processing
*/
EmailNotification processEmailNotification(EmailNotification emailNotification);
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package com.staffinghub.coding.challenges.retry.core.logic;

import com.staffinghub.coding.challenges.retry.core.entities.EmailNotification;
import com.staffinghub.coding.challenges.retry.core.entities.EmailNotificationMessage;
import com.staffinghub.coding.challenges.retry.core.inbound.NotificationHandler;
import com.staffinghub.coding.challenges.retry.core.outbound.NotificationSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
public class NotificationService implements NotificationHandler {

private NotificationSender notificationSender;
private PollableChannel channel;

public NotificationService(NotificationSender notificationSender) {
this.notificationSender = notificationSender;
private final MessagingTemplate messagingTemplate = new MessagingTemplate();

public NotificationService(PollableChannel channel) {
this.channel = channel;
}

/** {@inheritDoc} */
@Override
public EmailNotification processEmailNotification(EmailNotification emailNotification) {
notificationSender.sendEmail(emailNotification);
messagingTemplate.send(channel, MessageBuilder.withPayload(EmailNotificationMessage
.builder()
.emailNotification(emailNotification)
.build()).build());
log.info("Message queued: {}", emailNotification);
return emailNotification;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package com.staffinghub.coding.challenges.retry.core.outbound;

import com.staffinghub.coding.challenges.retry.core.entities.EmailNotification;
import com.staffinghub.coding.challenges.retry.core.entities.EmailNotificationMessage;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;

public interface NotificationSender {

/**
* Validates and submits a given emailNotification {@link EmailNotification} to send a new email
* @param emailNotification {@link EmailNotification}
*/
void sendEmail(@Valid @NotNull EmailNotification emailNotification);

/**
* Re-queues a given emailNotificationMessage to retry the notification for a max of five tries using exponential back-off
* @param emailNotificationMessage {@link EmailNotificationMessage}
*/
void notificationEvent(@Valid @NotNull EmailNotificationMessage emailNotificationMessage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public EmailController(NotificationHandler notificationHandler) {
this.notificationHandler = notificationHandler;
}

/**
* Creates a new email notification using the given emailNotification request {@link EmailNotification}
* @param emailNotification {@link EmailNotification} Email notification to request
* @return {@link ResponseEntity<EmailNotification>} The status of the notification request
*/
@PostMapping
public ResponseEntity<EmailNotification> createEmailNotification(@RequestBody EmailNotification emailNotification) {
EmailNotification emailNotificationResult = notificationHandler.processEmailNotification(emailNotification);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,67 @@
package com.staffinghub.coding.challenges.retry.outbound;

import com.staffinghub.coding.challenges.retry.core.entities.EmailNotification;
import com.staffinghub.coding.challenges.retry.core.entities.EmailNotificationMessage;
import com.staffinghub.coding.challenges.retry.core.outbound.NotificationSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;

import static java.time.temporal.ChronoUnit.SECONDS;

@Service
@Validated
@Slf4j
public class EmailNotificationSenderService implements NotificationSender {

private static final String SENDER_ADDRESS = "info@test.com";

private JavaMailSender mailSender;

public EmailNotificationSenderService(JavaMailSender mailSender) {
private PollableChannel channel;

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

public EmailNotificationSenderService(JavaMailSender mailSender, PollableChannel channel) {
this.mailSender = mailSender;
this.channel = channel;
}

/** {@inheritDoc} */
@Async
@EventListener
@Override
public void notificationEvent(EmailNotificationMessage emailNotificationMessage) {
log.info("Received notification by event for email {} in date {}.",
emailNotificationMessage,
emailNotificationMessage.getTimestamp());
try {
this.sendEmail(emailNotificationMessage.getEmailNotification());
} catch(Exception ex) {
if (emailNotificationMessage.getRetryAttempts() > 5) {
return;
}

emailNotificationMessage.setRetryAttempts(emailNotificationMessage.getRetryAttempts() + 1);
emailNotificationMessage.setDueTimestamp(LocalDateTime.now().plus(5 * (10^emailNotificationMessage.getRetryAttempts()), SECONDS));
messagingTemplate.send(channel, MessageBuilder.withPayload(emailNotificationMessage).build());
log.info("Message re-queued: {}", emailNotificationMessage.getEmailNotification());
}

}

/** {@inheritDoc} */
@Async
@Override
public void sendEmail(@Valid @NotNull EmailNotification emailNotification) {
Expand All @@ -34,6 +73,7 @@ public void sendEmail(@Valid @NotNull EmailNotification emailNotification) {
mailMessage.setText(emailNotification.getText());

mailSender.send(mailMessage);
log.info("Successfully sent: {}", emailNotification);
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to send email to recipient: %s", emailNotification.getRecipient()));
}
Expand Down
3 changes: 3 additions & 0 deletions Backend/retry/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ spring:
starttls:
enable: false
required: false
data:
mongodb:
uri: mongodb://localhost:27017/db
Loading