Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check for ContributeAndFinalize in ReplicatesService #576

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

### New Features
- Add ContributeAndFinalize to `ReplicateWorkflow`. (#574)
- Add check for ContributeAndFinalize in `ReplicatesService`. (#576)
### Bug Fixes
- Prevent race condition on replicate update. (#568)
### Quality
Expand Down
23 changes: 15 additions & 8 deletions src/main/java/com/iexec/core/chain/IexecHubService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public IexecHubService(CredentialsService credentialsService,
/**
* Check if the task is defined onchain and
* has the status {@link ChainTaskStatus#UNSET}.
*
*
* @param chainDealId
* @param taskIndex
* @return true if the task is found with the status UNSET, false otherwise.
Expand All @@ -74,10 +74,17 @@ public boolean isTaskInUnsetStatusOnChain(String chainDealId, int taskIndex) {
|| ChainTaskStatus.UNSET.equals(chainTask.get().getStatus());
}


public boolean isTaskInCompletedStatusOnChain(String chainTaskId) {
return getChainTask(chainTaskId)
.filter(chainTask -> ChainTaskStatus.COMPLETED == chainTask.getStatus())
.isPresent();
}

/**
* Check if a deal's contribution deadline
* is still not reached.
*
*
* @param chainDealId
* @return true if deadline is not reached, false otherwise.
*/
Expand All @@ -90,7 +97,7 @@ public boolean isBeforeContributionDeadline(String chainDealId) {
/**
* Check if a deal's contribution deadline
* is still not reached.
*
*
* @param chainDeal
* @return true if deadline is not reached, false otherwise.
*/
Expand All @@ -103,13 +110,13 @@ public boolean isBeforeContributionDeadline(ChainDeal chainDeal) {
* <p> Get deal's contribution deadline date. The deadline
* is calculated as follow:
* start + maxCategoryTime * maxNbOfPeriods.
*
*
* <ul>
* <li> start: the start time of the deal.
* <li> maxCategoryTime: duration of the deal's category.
* <li> nbOfCategoryUnits: number of category units dedicated
* for the contribution phase.
*
*
* @param chainDeal
* @return
*/
Expand All @@ -125,13 +132,13 @@ public Date getChainDealContributionDeadline(ChainDeal chainDeal) {
* <p> Get deal's final deadline date. The deadline
* is calculated as follow:
* start + maxCategoryTime * 10.
*
*
* <ul>
* <li> start: the start time of the deal.
* <li> maxCategoryTime: duration of the deal's category.
* <li> 10: number of category units dedicated
* for the hole execution.
*
*
* @param chainDeal
* @return
*/
Expand Down Expand Up @@ -235,7 +242,7 @@ public boolean hasEnoughGas() {
private ChainReceipt buildChainReceipt(TransactionReceipt receipt) {
return ChainReceipt.builder()
.txHash(receipt.getTransactionHash())
.blockNumber(receipt.getBlockNumber() != null?
.blockNumber(receipt.getBlockNumber() != null ?
receipt.getBlockNumber().longValue() : 0)
.build();
}
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public ReplicateStatusUpdateError canUpdateReplicateStatus(String chainTaskId,
case REVEAL_FAILED:
canUpdate = false;
break;
case CONTRIBUTE_AND_FINALIZE_DONE:
case RESULT_UPLOAD_FAILED:
canUpdate = verifyStatus(chainTaskId, walletAddress, newStatus, updateReplicateStatusArgs);
break;
Expand Down Expand Up @@ -349,15 +350,16 @@ public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateS
* 3) if worker did succeed onChain when CONTRIBUTED/REVEALED.
* 4) if worker did upload when RESULT_UPLOADING.
*/

/**
* This method updates a replicate while caring about thread safety.
* A single replicate can then NOT be updated twice at the same time.
* This method should be preferred to
* {@link ReplicatesService#updateReplicateStatusWithoutThreadSafety(String, String, ReplicateStatusUpdate, UpdateReplicateStatusArgs)}!
*
* @param chainTaskId Chain task id of the task whose replicate should be updated.
* @param walletAddress Wallet address of the worker whose replicate should be updated.
* @param statusUpdate Info about the status update - new status, date of update, ...
* @param chainTaskId Chain task id of the task whose replicate should be updated.
* @param walletAddress Wallet address of the worker whose replicate should be updated.
* @param statusUpdate Info about the status update - new status, date of update, ...
* @param updateReplicateStatusArgs Optional args used to update the status.
* @return Either a {@link ReplicateStatusUpdateError} if the status can't be updated,
* or a next action for the worker.
Expand Down Expand Up @@ -395,9 +397,9 @@ public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateS
* This method has to be used with a synchronization mechanism, e.g.
* {@link ReplicatesService#updateReplicateStatus(String, String, ReplicateStatus, ReplicateStatusDetails)}
*
* @param chainTaskId Chain task id of the task whose replicate should be updated.
* @param walletAddress Wallet address of the worker whose replicate should be updated.
* @param statusUpdate Info about the status update - new status, date of update, ...
* @param chainTaskId Chain task id of the task whose replicate should be updated.
* @param walletAddress Wallet address of the worker whose replicate should be updated.
* @param statusUpdate Info about the status update - new status, date of update, ...
* @param updateReplicateStatusArgs Optional args used to update the status.
* @return Either a {@link ReplicateStatusUpdateError} if the status can't be updated,
* or a next action for the worker.
Expand Down Expand Up @@ -537,6 +539,9 @@ private boolean verifyStatus(String chainTaskId,
return isResultUploaded(updateReplicateStatusArgs.getTaskDescription());
case RESULT_UPLOAD_FAILED:
return !isResultUploaded(updateReplicateStatusArgs.getTaskDescription());
case CONTRIBUTE_AND_FINALIZE_DONE:
return iexecHubService.repeatIsRevealedTrue(chainTaskId, walletAddress)
&& iexecHubService.isTaskInCompletedStatusOnChain(chainTaskId);
default:
return true;
}
Expand Down Expand Up @@ -583,7 +588,7 @@ private String getStatusUpdateLogs(String chainTaskId, Replicate replicate, Repl
public boolean isResultUploaded(String chainTaskId) {
Optional<TaskDescription> task = iexecHubService.getTaskDescriptionFromChain(chainTaskId);

if (task.isEmpty()){
if (task.isEmpty()) {
return false;
}

Expand All @@ -592,7 +597,7 @@ public boolean isResultUploaded(String chainTaskId) {

public boolean isResultUploaded(TaskDescription task) {
// Offchain computing - basic & tee
if (task.containsCallback()){
if (task.containsCallback()) {
return true;
}

Expand Down
75 changes: 75 additions & 0 deletions src/test/java/com/iexec/core/chain/IexecHubServiceTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.iexec.core.chain;

import com.iexec.commons.poco.chain.ChainTask;
import com.iexec.commons.poco.chain.ChainTaskStatus;
import com.iexec.commons.poco.contract.generated.IexecHubContract;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.web3j.crypto.Credentials;
import org.web3j.crypto.Keys;
import org.web3j.protocol.core.RemoteFunctionCall;
import org.web3j.tx.TransactionManager;

import java.math.BigInteger;
import java.util.Optional;

import static com.iexec.commons.poco.utils.TestUtils.CHAIN_TASK_ID;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

class IexecHubServiceTests {

@Mock
private CredentialsService credentialsService;

@Mock
private Web3jService web3jService;

@Mock
private ChainConfig chainConfig;

private IexecHubService iexecHubService;

@BeforeEach
void init() throws Exception {
MockitoAnnotations.openMocks(this);

final Credentials credentials = Credentials.create(Keys.createEcKeyPair());

when(credentialsService.getCredentials()).thenReturn(credentials);
when(web3jService.hasEnoughGas(any())).thenReturn(true);
when(chainConfig.getHubAddress()).thenReturn("0x748e091bf16048cb5103E0E10F9D5a8b7fBDd860");

try (MockedStatic<IexecHubContract> iexecHubContract = Mockito.mockStatic(IexecHubContract.class)) {
final IexecHubContract mockIexecContract = mock(IexecHubContract.class);
final RemoteFunctionCall<BigInteger> mockRemoteFunctionCall = mock(RemoteFunctionCall.class);
iexecHubContract.when(() -> IexecHubContract.load(any(), any(), (TransactionManager) any(), any()))
.thenReturn(mockIexecContract);
when(mockIexecContract.contribution_deadline_ratio()).thenReturn(mockRemoteFunctionCall);
when(mockRemoteFunctionCall.send()).thenReturn(BigInteger.ONE);
iexecHubService = spy(new IexecHubService(credentialsService, web3jService, chainConfig));
}
}


@Test
void shouldTaskBeInCompletedStatusOnChain() {
final ChainTask task = ChainTask.builder().status(ChainTaskStatus.COMPLETED).build();
doReturn(Optional.of(task)).when(iexecHubService).getChainTask(CHAIN_TASK_ID);

assertThat(iexecHubService.isTaskInCompletedStatusOnChain(CHAIN_TASK_ID)).isTrue();
}

@Test
void shouldTaskNotBeInCompletedStatusOnChain() {
final ChainTask task = ChainTask.builder().status(ChainTaskStatus.REVEALING).build();
doReturn(Optional.of(task)).when(iexecHubService).getChainTask(CHAIN_TASK_ID);

assertThat(iexecHubService.isTaskInCompletedStatusOnChain(CHAIN_TASK_ID)).isFalse();
}
}
77 changes: 77 additions & 0 deletions src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,83 @@ void shouldNotAuthorizeUpdateOnResultUploadedSinceResultNotUploaded() {
.isEqualTo(ReplicateStatusUpdateError.GENERIC_CANT_UPDATE);
}

@Test
void shouldAuthorizeUpdateOnContributeAndFinalizeOnGoing() {
final Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID);
replicate.updateStatus(COMPUTED, ReplicateStatusModifier.WORKER);

final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.singletonList(replicate));
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
.modifier(WORKER)
.status(CONTRIBUTE_AND_FINALIZE_ONGOING)
.build();

when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));

assertThat(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, statusUpdate, null))
.isEqualTo(ReplicateStatusUpdateError.NO_ERROR);
}

@Test
void shouldAuthorizeUpdateOnContributeAndFinalizeDone() {
final Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID);
replicate.updateStatus(CONTRIBUTE_AND_FINALIZE_ONGOING, ReplicateStatusModifier.WORKER);

final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.singletonList(replicate));
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
.modifier(WORKER)
.status(CONTRIBUTE_AND_FINALIZE_DONE)
.build();

when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(iexecHubService.repeatIsRevealedTrue(CHAIN_TASK_ID, WALLET_WORKER_1))
.thenReturn(true);
when(iexecHubService.isTaskInCompletedStatusOnChain(CHAIN_TASK_ID)).thenReturn(true);

assertThat(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, statusUpdate, null))
.isEqualTo(ReplicateStatusUpdateError.NO_ERROR);
}

@Test
void shouldNotAuthorizeUpdateOnContributeAndFinalizeDoneWhenNotRevealed() {
final Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID);
replicate.updateStatus(CONTRIBUTE_AND_FINALIZE_ONGOING, ReplicateStatusModifier.WORKER);

final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.singletonList(replicate));
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
.modifier(WORKER)
.status(CONTRIBUTE_AND_FINALIZE_DONE)
.build();

when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(iexecHubService.repeatIsRevealedTrue(CHAIN_TASK_ID, WALLET_WORKER_1))
.thenReturn(false);
when(iexecHubService.isTaskInCompletedStatusOnChain(CHAIN_TASK_ID)).thenReturn(true);

assertThat(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, statusUpdate, null))
.isEqualTo(ReplicateStatusUpdateError.GENERIC_CANT_UPDATE);
}

@Test
void shouldNotAuthorizeUpdateOnContributeAndFinalizeDoneWhenTaskNotCompleted() {
final Replicate replicate = new Replicate(WALLET_WORKER_1, CHAIN_TASK_ID);
replicate.updateStatus(CONTRIBUTE_AND_FINALIZE_ONGOING, ReplicateStatusModifier.WORKER);

final ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, Collections.singletonList(replicate));
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.builder()
.modifier(WORKER)
.status(CONTRIBUTE_AND_FINALIZE_DONE)
.build();

when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(iexecHubService.repeatIsRevealedTrue(CHAIN_TASK_ID, WALLET_WORKER_1))
.thenReturn(true);
when(iexecHubService.isTaskInCompletedStatusOnChain(CHAIN_TASK_ID)).thenReturn(false);

assertThat(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, statusUpdate, null))
.isEqualTo(ReplicateStatusUpdateError.GENERIC_CANT_UPDATE);
}

// computeUpdateReplicateStatusArgs

@Test
Expand Down