From ad9801b709d7378011dc922d3d14811740a3494d Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Tue, 27 Jun 2023 16:36:08 +0200 Subject: [PATCH 1/7] Block notifications when chain is down --- .../BlockchainConnectionHealthIndicator.java | 4 ++ .../core/pubsub/NotificationService.java | 15 +++++++- .../core/pubsub/NotificationServiceTests.java | 38 ++++++++++++++++--- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/iexec/core/chain/BlockchainConnectionHealthIndicator.java b/src/main/java/com/iexec/core/chain/BlockchainConnectionHealthIndicator.java index 310f5d3c..465e2740 100644 --- a/src/main/java/com/iexec/core/chain/BlockchainConnectionHealthIndicator.java +++ b/src/main/java/com/iexec/core/chain/BlockchainConnectionHealthIndicator.java @@ -176,4 +176,8 @@ public Health health() { .withDetail("outOfServiceThreshold", outOfServiceThreshold) .build(); } + + public boolean isUp() { + return health().getStatus() == Status.UP; + } } diff --git a/src/main/java/com/iexec/core/pubsub/NotificationService.java b/src/main/java/com/iexec/core/pubsub/NotificationService.java index 67ac7576..6b9b4643 100644 --- a/src/main/java/com/iexec/core/pubsub/NotificationService.java +++ b/src/main/java/com/iexec/core/pubsub/NotificationService.java @@ -17,7 +17,9 @@ package com.iexec.core.pubsub; import com.iexec.commons.poco.notification.TaskNotification; +import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.actuate.health.Status; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; @@ -25,13 +27,22 @@ @Service public class NotificationService { - private SimpMessagingTemplate sender; + private final SimpMessagingTemplate sender; + private final BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; - public NotificationService(SimpMessagingTemplate sender) { + public NotificationService(SimpMessagingTemplate sender, + BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator) { this.sender = sender; + this.blockchainConnectionHealthIndicator = blockchainConnectionHealthIndicator; } public void sendTaskNotification(TaskNotification taskNotification) { + if (!blockchainConnectionHealthIndicator.isUp()) { + log.debug("Blockchain is down. Task notification not sent [chainTaskId:{}, type:{}, workers:{}]", + taskNotification.getChainTaskId(), taskNotification.getTaskNotificationType(), taskNotification.getWorkersAddress()); + return; + } + sender.convertAndSend("/topic/task/" + taskNotification.getChainTaskId(), taskNotification); log.info("Sent TaskNotification [chainTaskId:{}, type:{}, workers:{}]", taskNotification.getChainTaskId(), taskNotification.getTaskNotificationType(), taskNotification.getWorkersAddress()); diff --git a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java index 444eab6e..545e9160 100644 --- a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java +++ b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java @@ -16,6 +16,7 @@ package com.iexec.core.pubsub; +import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; @@ -23,15 +24,24 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; +import org.springframework.boot.actuate.health.Health; import org.springframework.messaging.simp.SimpMessagingTemplate; import com.iexec.commons.poco.notification.TaskNotification; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + class NotificationServiceTests { @Mock private SimpMessagingTemplate sender; + @Mock + private BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; + @Spy @InjectMocks private NotificationService notificationService; @@ -41,16 +51,34 @@ class NotificationServiceTests { @Test void shouldSendTaskNotification() { - String chainTaskId = "chainTaskId"; - TaskNotification taskNotification = TaskNotification.builder() + when(blockchainConnectionHealthIndicator.isUp()).thenReturn(true); + + final String chainTaskId = "chainTaskId"; + final TaskNotification taskNotification = TaskNotification.builder() .chainTaskId(chainTaskId) .build(); notificationService.sendTaskNotification(taskNotification); - String destination = "/topic/task/" + taskNotification.getChainTaskId(); + final String destination = "/topic/task/" + taskNotification.getChainTaskId(); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); + verify(sender, times(1)).convertAndSend(destination, taskNotification); + } + + @Test + void shouldNotSendTaskNotificationSinceChainConnexionDown() { + when(blockchainConnectionHealthIndicator.isUp()).thenReturn(false); + + final String chainTaskId = "chainTaskId"; + final TaskNotification taskNotification = TaskNotification.builder() + .chainTaskId(chainTaskId) + .build(); - Mockito.verify(sender, Mockito.times(1)) - .convertAndSend(destination, taskNotification); + notificationService.sendTaskNotification(taskNotification); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); + verify(sender, never()).convertAndSend(anyString(), any(TaskNotification.class)); } + } \ No newline at end of file From 0e6b9922e62a6bbaa04a7d531eff70e0695a68e4 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Tue, 27 Jun 2023 16:44:31 +0200 Subject: [PATCH 2/7] Block `getAvailableReplicateTaskSummary` when chain is down --- .../core/replicate/ReplicatesController.java | 13 ++++++- .../replicate/ReplicateControllerTests.java | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesController.java b/src/main/java/com/iexec/core/replicate/ReplicatesController.java index 37c0dc81..bc8f3294 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesController.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesController.java @@ -19,11 +19,13 @@ import com.iexec.common.replicate.*; import com.iexec.commons.poco.notification.TaskNotification; import com.iexec.commons.poco.notification.TaskNotificationType; +import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import com.iexec.core.security.JwtTokenProvider; import com.iexec.core.worker.WorkerService; import feign.FeignException; import io.vavr.control.Either; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.actuate.health.Status; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -41,15 +43,18 @@ public class ReplicatesController { private final ReplicateSupplyService replicateSupplyService; private final JwtTokenProvider jwtTokenProvider; private final WorkerService workerService; + private final BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; public ReplicatesController(ReplicatesService replicatesService, ReplicateSupplyService replicateSupplyService, JwtTokenProvider jwtTokenProvider, - WorkerService workerService) { + WorkerService workerService, + BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator) { this.replicatesService = replicatesService; this.replicateSupplyService = replicateSupplyService; this.jwtTokenProvider = jwtTokenProvider; this.workerService = workerService; + this.blockchainConnectionHealthIndicator = blockchainConnectionHealthIndicator; } @GetMapping("/replicates/available") @@ -61,6 +66,12 @@ public ResponseEntity getAvailableReplicateTaskSummary( return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } + if (!blockchainConnectionHealthIndicator.isUp()) { + log.debug("Blockchain is down. Can't get available replicate task summary" + + " [workerAddress: {}]", workerWalletAddress); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + if (!workerService.isWorkerAllowedToAskReplicate(workerWalletAddress)) { return ResponseEntity.status(HttpStatus.NO_CONTENT).build(); } diff --git a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java index 9121fe58..f91fe162 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java @@ -20,6 +20,7 @@ import com.iexec.commons.poco.chain.WorkerpoolAuthorization; import com.iexec.commons.poco.notification.TaskNotification; import com.iexec.commons.poco.notification.TaskNotificationType; +import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import com.iexec.core.security.JwtTokenProvider; import com.iexec.core.worker.WorkerService; import io.vavr.control.Either; @@ -29,6 +30,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -37,6 +39,8 @@ import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class ReplicateControllerTests { @@ -69,6 +73,8 @@ class ReplicateControllerTests { private JwtTokenProvider jwtTokenProvider; @Mock private WorkerService workerService; + @Mock + private BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; @InjectMocks private ReplicatesController replicatesController; @@ -84,6 +90,8 @@ void setup() { void shouldGetAvailableReplicate() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS)) .thenReturn(true); when(replicateSupplyService @@ -96,6 +104,8 @@ void shouldGetAvailableReplicate() { assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.OK); ReplicateTaskSummary replicateTaskSummary = replicateTaskSummaryResponse.getBody(); assertThat(replicateTaskSummary.getWorkerpoolAuthorization().getChainTaskId()).isEqualTo(CHAIN_TASK_ID); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test @@ -107,12 +117,32 @@ void shouldNotGetAvailableReplicateSinceNotAuthorizedToken() { replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN); assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED); + + verify(blockchainConnectionHealthIndicator, never()).isUp(); + } + + @Test + void shouldNotGetAvailableReplicateSinceChainIsDown() { + when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) + .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(false); + + ResponseEntity replicateTaskSummaryResponse = + replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN); + + assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } + @Test void shouldNotGetAvailableReplicateSinceNotAllowed() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS)) .thenReturn(false); @@ -120,12 +150,16 @@ void shouldNotGetAvailableReplicateSinceNotAllowed() { replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN); assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test void shouldNotGetAvailableReplicateSinceNoReplicateAvailable() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS)) .thenReturn(true); when(replicateSupplyService @@ -136,6 +170,8 @@ void shouldNotGetAvailableReplicateSinceNoReplicateAvailable() { replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN); assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } //endregion From f238b68f4a5126f525ec499828c0b525e633085b Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Tue, 27 Jun 2023 16:48:20 +0200 Subject: [PATCH 3/7] Block `updateReplicateStatus` when chain is down --- .../core/replicate/ReplicatesController.java | 6 +++ .../replicate/ReplicateControllerTests.java | 38 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesController.java b/src/main/java/com/iexec/core/replicate/ReplicatesController.java index bc8f3294..e64891e7 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesController.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesController.java @@ -124,6 +124,12 @@ public ResponseEntity updateReplicateStatus( return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } + if (!blockchainConnectionHealthIndicator.isUp()) { + log.debug("Blockchain is down. Won't update replicate status" + + " [workerAddress: {}]", walletAddress); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + statusUpdate.setModifier(ReplicateStatusModifier.WORKER); statusUpdate.setDate(new Date()); diff --git a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java index f91fe162..21797a43 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java @@ -228,6 +228,8 @@ void shouldGetEmptyMissedNotifications() { void shouldUpdateReplicate() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE)) .thenReturn(UPDATE_ARGS); when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS)) @@ -242,6 +244,8 @@ void shouldUpdateReplicate() { assertThat(response.getBody()) .isEqualTo(TaskNotificationType.PLEASE_DOWNLOAD_APP); assertThat(UPDATE.getModifier()).isEqualTo(ReplicateStatusModifier.WORKER); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test @@ -253,6 +257,8 @@ void shouldUpdateReplicateAndSetWalletAddress() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs)) .thenReturn(UPDATE_ARGS); when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs, UPDATE_ARGS)) @@ -268,6 +274,8 @@ void shouldUpdateReplicateAndSetWalletAddress() { .isEqualTo(TaskNotificationType.PLEASE_DOWNLOAD_APP); assertThat(updateWithLogs.getModifier()).isEqualTo(ReplicateStatusModifier.WORKER); assertThat(updateWithLogs.getDetails().getComputeLogs().getWalletAddress()).isEqualTo(WALLET_ADDRESS); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test @@ -280,8 +288,26 @@ void shouldNotUpdateReplicateSinceUnauthorized() { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED); assertThat(response.getBody()).isNull(); + + verify(blockchainConnectionHealthIndicator, never()).isUp(); + } + + @Test + void shouldReturnServiceUnavailableSinceChainIsDown() { + when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) + .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(false); + + ResponseEntity response = + replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } + @ParameterizedTest @EnumSource(value = ReplicateStatusUpdateError.class, names = { "UNKNOWN_REPLICATE", @@ -292,6 +318,8 @@ void shouldNotUpdateReplicateSinceUnauthorized() { void shouldReturnPleaseAbortSinceCantUpdate(ReplicateStatusUpdateError error) { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE)) .thenReturn(UPDATE_ARGS); when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS)) @@ -302,12 +330,16 @@ void shouldReturnPleaseAbortSinceCantUpdate(ReplicateStatusUpdateError error) { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(response.getBody()).isEqualTo(TaskNotificationType.PLEASE_ABORT); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test void shouldReply208AlreadyReported() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE)) .thenReturn(UPDATE_ARGS); when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS)) @@ -319,12 +351,16 @@ void shouldReply208AlreadyReported() { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ALREADY_REPORTED); assertThat(response.getBody()) .isEqualTo(TaskNotificationType.PLEASE_WAIT); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } @Test void shouldReply500WhenErrorNotExpected() { when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN)) .thenReturn(WALLET_ADDRESS); + when(blockchainConnectionHealthIndicator.isUp()) + .thenReturn(true); when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE)) .thenReturn(UPDATE_ARGS); when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS)) @@ -334,6 +370,8 @@ void shouldReply500WhenErrorNotExpected() { replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + + verify(blockchainConnectionHealthIndicator, times(1)).isUp(); } //endregion } From d73266620cac09b7655549005f2026a749b2ad6e Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Tue, 27 Jun 2023 16:48:57 +0200 Subject: [PATCH 4/7] Remove unused imports --- .../java/com/iexec/core/replicate/ReplicatesController.java | 1 - .../com/iexec/core/replicate/ReplicateControllerTests.java | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/main/java/com/iexec/core/replicate/ReplicatesController.java b/src/main/java/com/iexec/core/replicate/ReplicatesController.java index e64891e7..f40e9e95 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicatesController.java +++ b/src/main/java/com/iexec/core/replicate/ReplicatesController.java @@ -25,7 +25,6 @@ import feign.FeignException; import io.vavr.control.Either; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.actuate.health.Status; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; diff --git a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java index 21797a43..34f9a4d9 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateControllerTests.java @@ -30,7 +30,6 @@ import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -40,8 +39,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; class ReplicateControllerTests { From 0713572096a0e9eab01cd4cd54dca590b21d0cb5 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Tue, 27 Jun 2023 16:50:40 +0200 Subject: [PATCH 5/7] Add missing new line and remove unused imports --- .../com/iexec/core/pubsub/NotificationServiceTests.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java index 545e9160..834bc5fc 100644 --- a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java +++ b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java @@ -16,23 +16,19 @@ package com.iexec.core.pubsub; +import com.iexec.commons.poco.notification.TaskNotification; import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; -import org.springframework.boot.actuate.health.Health; import org.springframework.messaging.simp.SimpMessagingTemplate; -import com.iexec.commons.poco.notification.TaskNotification; - import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.when; class NotificationServiceTests { @@ -81,4 +77,4 @@ void shouldNotSendTaskNotificationSinceChainConnexionDown() { verify(sender, never()).convertAndSend(anyString(), any(TaskNotification.class)); } -} \ No newline at end of file +} From 4e4ea2adafbc48e5ddc5f57ff2121dd71d683a35 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Fri, 30 Jun 2023 09:24:09 +0200 Subject: [PATCH 6/7] Update `CHANGELOG.md` --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6dda8900..93c214f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file. ### New Features - Add blockchain connection health indicator. (#601) +- Block some connections and messages when blockchain connection is down. (#604) ### Bug Fixes - Clean call to `iexecHubService#getTaskDescriptionFromChain` in test. (#597) - Reject deal if TEE tag but trust not in {0,1}. (#598) From 82120e3a8e1eab09d3ab9ea3314590e463161e08 Mon Sep 17 00:00:00 2001 From: Maxence Cornaton Date: Mon, 3 Jul 2023 16:52:40 +0200 Subject: [PATCH 7/7] Revert "Block notifications when chain is down" --- .../core/pubsub/NotificationService.java | 15 +------ .../core/pubsub/NotificationServiceTests.java | 40 ++++--------------- 2 files changed, 10 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/iexec/core/pubsub/NotificationService.java b/src/main/java/com/iexec/core/pubsub/NotificationService.java index 6b9b4643..67ac7576 100644 --- a/src/main/java/com/iexec/core/pubsub/NotificationService.java +++ b/src/main/java/com/iexec/core/pubsub/NotificationService.java @@ -17,9 +17,7 @@ package com.iexec.core.pubsub; import com.iexec.commons.poco.notification.TaskNotification; -import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.actuate.health.Status; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; @@ -27,22 +25,13 @@ @Service public class NotificationService { - private final SimpMessagingTemplate sender; - private final BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; + private SimpMessagingTemplate sender; - public NotificationService(SimpMessagingTemplate sender, - BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator) { + public NotificationService(SimpMessagingTemplate sender) { this.sender = sender; - this.blockchainConnectionHealthIndicator = blockchainConnectionHealthIndicator; } public void sendTaskNotification(TaskNotification taskNotification) { - if (!blockchainConnectionHealthIndicator.isUp()) { - log.debug("Blockchain is down. Task notification not sent [chainTaskId:{}, type:{}, workers:{}]", - taskNotification.getChainTaskId(), taskNotification.getTaskNotificationType(), taskNotification.getWorkersAddress()); - return; - } - sender.convertAndSend("/topic/task/" + taskNotification.getChainTaskId(), taskNotification); log.info("Sent TaskNotification [chainTaskId:{}, type:{}, workers:{}]", taskNotification.getChainTaskId(), taskNotification.getTaskNotificationType(), taskNotification.getWorkersAddress()); diff --git a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java index 834bc5fc..444eab6e 100644 --- a/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java +++ b/src/test/java/com/iexec/core/pubsub/NotificationServiceTests.java @@ -16,28 +16,22 @@ package com.iexec.core.pubsub; -import com.iexec.commons.poco.notification.TaskNotification; -import com.iexec.core.chain.BlockchainConnectionHealthIndicator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; import org.springframework.messaging.simp.SimpMessagingTemplate; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import com.iexec.commons.poco.notification.TaskNotification; class NotificationServiceTests { @Mock private SimpMessagingTemplate sender; - @Mock - private BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator; - @Spy @InjectMocks private NotificationService notificationService; @@ -47,34 +41,16 @@ class NotificationServiceTests { @Test void shouldSendTaskNotification() { - when(blockchainConnectionHealthIndicator.isUp()).thenReturn(true); - - final String chainTaskId = "chainTaskId"; - final TaskNotification taskNotification = TaskNotification.builder() + String chainTaskId = "chainTaskId"; + TaskNotification taskNotification = TaskNotification.builder() .chainTaskId(chainTaskId) .build(); notificationService.sendTaskNotification(taskNotification); - final String destination = "/topic/task/" + taskNotification.getChainTaskId(); + String destination = "/topic/task/" + taskNotification.getChainTaskId(); - verify(blockchainConnectionHealthIndicator, times(1)).isUp(); - verify(sender, times(1)).convertAndSend(destination, taskNotification); + Mockito.verify(sender, Mockito.times(1)) + .convertAndSend(destination, taskNotification); } - - @Test - void shouldNotSendTaskNotificationSinceChainConnexionDown() { - when(blockchainConnectionHealthIndicator.isUp()).thenReturn(false); - - final String chainTaskId = "chainTaskId"; - final TaskNotification taskNotification = TaskNotification.builder() - .chainTaskId(chainTaskId) - .build(); - - notificationService.sendTaskNotification(taskNotification); - - verify(blockchainConnectionHealthIndicator, times(1)).isUp(); - verify(sender, never()).convertAndSend(anyString(), any(TaskNotification.class)); - } - -} +} \ No newline at end of file