Skip to content

Commit

Permalink
Merge pull request #604 from iExecBlockchainComputing/feature/block-c…
Browse files Browse the repository at this point in the history
…ommunications-while-chain-down

Feature/block communications while chain down
  • Loading branch information
mcornaton authored Jul 6, 2023
2 parents a9e50e8 + 82120e3 commit 095086a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

### New Features
- Add blockchain connection health indicator. (#601)
- Block some connections and messages when blockchain connection is down. (#604)
### Bug Fixes
- Clean call to `iexecHubService#getTaskDescriptionFromChain` in test. (#597)
- Reject deal if TEE tag but trust not in {0,1}. (#598)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,8 @@ public Health health() {
.withDetail("outOfServiceThreshold", outOfServiceThreshold)
.build();
}

public boolean isUp() {
return health().getStatus() == Status.UP;
}
}
18 changes: 17 additions & 1 deletion src/main/java/com/iexec/core/replicate/ReplicatesController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.iexec.common.replicate.*;
import com.iexec.commons.poco.notification.TaskNotification;
import com.iexec.commons.poco.notification.TaskNotificationType;
import com.iexec.core.chain.BlockchainConnectionHealthIndicator;
import com.iexec.core.security.JwtTokenProvider;
import com.iexec.core.worker.WorkerService;
import feign.FeignException;
Expand All @@ -41,15 +42,18 @@ public class ReplicatesController {
private final ReplicateSupplyService replicateSupplyService;
private final JwtTokenProvider jwtTokenProvider;
private final WorkerService workerService;
private final BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator;

public ReplicatesController(ReplicatesService replicatesService,
ReplicateSupplyService replicateSupplyService,
JwtTokenProvider jwtTokenProvider,
WorkerService workerService) {
WorkerService workerService,
BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator) {
this.replicatesService = replicatesService;
this.replicateSupplyService = replicateSupplyService;
this.jwtTokenProvider = jwtTokenProvider;
this.workerService = workerService;
this.blockchainConnectionHealthIndicator = blockchainConnectionHealthIndicator;
}

@GetMapping("/replicates/available")
Expand All @@ -61,6 +65,12 @@ public ResponseEntity<ReplicateTaskSummary> getAvailableReplicateTaskSummary(
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}

if (!blockchainConnectionHealthIndicator.isUp()) {
log.debug("Blockchain is down. Can't get available replicate task summary" +
" [workerAddress: {}]", workerWalletAddress);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}

if (!workerService.isWorkerAllowedToAskReplicate(workerWalletAddress)) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).build();
}
Expand Down Expand Up @@ -113,6 +123,12 @@ public ResponseEntity<TaskNotificationType> updateReplicateStatus(
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}

if (!blockchainConnectionHealthIndicator.isUp()) {
log.debug("Blockchain is down. Won't update replicate status" +
" [workerAddress: {}]", walletAddress);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}

statusUpdate.setModifier(ReplicateStatusModifier.WORKER);
statusUpdate.setDate(new Date());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.iexec.commons.poco.chain.WorkerpoolAuthorization;
import com.iexec.commons.poco.notification.TaskNotification;
import com.iexec.commons.poco.notification.TaskNotificationType;
import com.iexec.core.chain.BlockchainConnectionHealthIndicator;
import com.iexec.core.security.JwtTokenProvider;
import com.iexec.core.worker.WorkerService;
import io.vavr.control.Either;
Expand All @@ -37,7 +38,7 @@
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

class ReplicateControllerTests {

Expand Down Expand Up @@ -69,6 +70,8 @@ class ReplicateControllerTests {
private JwtTokenProvider jwtTokenProvider;
@Mock
private WorkerService workerService;
@Mock
private BlockchainConnectionHealthIndicator blockchainConnectionHealthIndicator;

@InjectMocks
private ReplicatesController replicatesController;
Expand All @@ -84,6 +87,8 @@ void setup() {
void shouldGetAvailableReplicate() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS))
.thenReturn(true);
when(replicateSupplyService
Expand All @@ -96,6 +101,8 @@ void shouldGetAvailableReplicate() {
assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.OK);
ReplicateTaskSummary replicateTaskSummary = replicateTaskSummaryResponse.getBody();
assertThat(replicateTaskSummary.getWorkerpoolAuthorization().getChainTaskId()).isEqualTo(CHAIN_TASK_ID);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
Expand All @@ -107,25 +114,49 @@ void shouldNotGetAvailableReplicateSinceNotAuthorizedToken() {
replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN);

assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED);

verify(blockchainConnectionHealthIndicator, never()).isUp();
}

@Test
void shouldNotGetAvailableReplicateSinceChainIsDown() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(false);

ResponseEntity<ReplicateTaskSummary> replicateTaskSummaryResponse =
replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN);

assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}


@Test
void shouldNotGetAvailableReplicateSinceNotAllowed() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS))
.thenReturn(false);

ResponseEntity<ReplicateTaskSummary> replicateTaskSummaryResponse =
replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN);

assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
void shouldNotGetAvailableReplicateSinceNoReplicateAvailable() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(workerService.isWorkerAllowedToAskReplicate(WALLET_ADDRESS))
.thenReturn(true);
when(replicateSupplyService
Expand All @@ -136,6 +167,8 @@ void shouldNotGetAvailableReplicateSinceNoReplicateAvailable() {
replicatesController.getAvailableReplicateTaskSummary(BLOCK_NUMBER, TOKEN);

assertThat(replicateTaskSummaryResponse.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}
//endregion

Expand Down Expand Up @@ -192,6 +225,8 @@ void shouldGetEmptyMissedNotifications() {
void shouldUpdateReplicate() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE))
.thenReturn(UPDATE_ARGS);
when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
Expand All @@ -206,6 +241,8 @@ void shouldUpdateReplicate() {
assertThat(response.getBody())
.isEqualTo(TaskNotificationType.PLEASE_DOWNLOAD_APP);
assertThat(UPDATE.getModifier()).isEqualTo(ReplicateStatusModifier.WORKER);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
Expand All @@ -217,6 +254,8 @@ void shouldUpdateReplicateAndSetWalletAddress() {

when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs))
.thenReturn(UPDATE_ARGS);
when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs, UPDATE_ARGS))
Expand All @@ -232,6 +271,8 @@ void shouldUpdateReplicateAndSetWalletAddress() {
.isEqualTo(TaskNotificationType.PLEASE_DOWNLOAD_APP);
assertThat(updateWithLogs.getModifier()).isEqualTo(ReplicateStatusModifier.WORKER);
assertThat(updateWithLogs.getDetails().getComputeLogs().getWalletAddress()).isEqualTo(WALLET_ADDRESS);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
Expand All @@ -244,8 +285,26 @@ void shouldNotUpdateReplicateSinceUnauthorized() {

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED);
assertThat(response.getBody()).isNull();

verify(blockchainConnectionHealthIndicator, never()).isUp();
}

@Test
void shouldReturnServiceUnavailableSinceChainIsDown() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(false);

ResponseEntity<TaskNotificationType> response =
replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}


@ParameterizedTest
@EnumSource(value = ReplicateStatusUpdateError.class, names = {
"UNKNOWN_REPLICATE",
Expand All @@ -256,6 +315,8 @@ void shouldNotUpdateReplicateSinceUnauthorized() {
void shouldReturnPleaseAbortSinceCantUpdate(ReplicateStatusUpdateError error) {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE))
.thenReturn(UPDATE_ARGS);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
Expand All @@ -266,12 +327,16 @@ void shouldReturnPleaseAbortSinceCantUpdate(ReplicateStatusUpdateError error) {

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(response.getBody()).isEqualTo(TaskNotificationType.PLEASE_ABORT);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
void shouldReply208AlreadyReported() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE))
.thenReturn(UPDATE_ARGS);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
Expand All @@ -283,12 +348,16 @@ void shouldReply208AlreadyReported() {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ALREADY_REPORTED);
assertThat(response.getBody())
.isEqualTo(TaskNotificationType.PLEASE_WAIT);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}

@Test
void shouldReply500WhenErrorNotExpected() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn(WALLET_ADDRESS);
when(blockchainConnectionHealthIndicator.isUp())
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE))
.thenReturn(UPDATE_ARGS);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
Expand All @@ -298,6 +367,8 @@ void shouldReply500WhenErrorNotExpected() {
replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR);

verify(blockchainConnectionHealthIndicator, times(1)).isUp();
}
//endregion
}

0 comments on commit 095086a

Please sign in to comment.