Skip to content

Commit

Permalink
Merge pull request #444 from iExecBlockchainComputing/hotfix/initiali…
Browse files Browse the repository at this point in the history
…zation-checks

Hotfix/initialization checks
  • Loading branch information
jeremyjams authored Oct 18, 2021
2 parents d32a577 + 6a7ea64 commit 1562e09
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 75 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=6.1.4
version=6.1.5
iexecCommonVersion=5.5.1
nexusUser=fake
nexusPassword=fake
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void detect() {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (chainTask.isPresent() && chainTask.get().getStatus().equals(ChainTaskStatus.COMPLETED)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.FINALIZING, TaskStatus.FINALIZED, task.getChainTaskId());
task.getCurrentStatus(), TaskStatus.FINALIZED, task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void detect() {
ChainTask chainTask = oChainTask.get();
if (chainTask.getStatus().equals(ChainTaskStatus.ACTIVE)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
TaskStatus.REOPENING, TaskStatus.REOPENED, task.getChainTaskId());
task.getCurrentStatus(), TaskStatus.REOPENED, task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ public void detect() {
List<Task> notYetFinalizingTasks = taskService.findByCurrentStatus(TaskStatus.RESULT_UPLOADED);
for (Task task : notYetFinalizingTasks) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, chainTaskId:{}]",
TaskStatus.RESULT_UPLOADED, TaskStatus.FINALIZING, task.getChainTaskId());
task.getCurrentStatus(), TaskStatus.FINALIZING, task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
}

//start initialize when needed
List<Task> notYetInitializedTasks = taskService.getInitializableTasks();
for (Task task : notYetInitializedTasks) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, chainTaskId:{}]",
TaskStatus.RECEIVED, TaskStatus.INITIALIZING, task.getChainTaskId());
task.getCurrentStatus(), TaskStatus.INITIALIZING, task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
}
taskService.lockTaskAccessForNewReplicate(chainTaskId);

boolean isFewBlocksAfterInitialization = isFewBlocksAfterInitialization(task);
boolean hasWorkerAlreadyParticipated = replicatesService.hasWorkerAlreadyParticipated(
chainTaskId, walletAddress);

if (isFewBlocksAfterInitialization && !hasWorkerAlreadyParticipated
if (!hasWorkerAlreadyParticipated
&& consensusService.doesTaskNeedMoreContributionsForConsensus(chainTaskId, task.getTrust(), task.getMaxExecutionTime())) {

String enclaveChallenge = smsService.getEnclaveChallenge(chainTaskId, isTeeTask);
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/iexec/core/task/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,19 @@ private void initializing2Initialized(Task task) {

private void initializing2Initialized(Task task, ChainReceipt chainReceipt) {
String chainTaskId = task.getChainTaskId();
if (!INITIALIZING.equals(task.getCurrentStatus())){
return;
}

ChainTaskStatus onChainStatus = iexecHubService.getChainTask(chainTaskId)
.map(ChainTask::getStatus)
.orElse(null);
if (!ChainTaskStatus.ACTIVE.equals(onChainStatus)){
log.warn("Cannot upgrade to initialized status [chainTaskId:{}, " +
"onChainStatus{}]", chainTaskId, onChainStatus);
return;
}

long initializationBlock = chainReceipt != null? chainReceipt.getBlockNumber() : 0;
if (initializationBlock == 0){
log.warn("Initialization block is empty, using deal block [chainTaskId:{}" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ public class ReplicateSupplyServiceTests {
private final static String TEE_TAG = "0x0000000000000000000000000000000000000000000000000000000000000001";
private final static String ENCLAVE_CHALLENGE = "dummyEnclave";
private final static long maxExecutionTime = 60000;
long initBlock = 10;
long coreLastBlock = initBlock + 2;
long workerLastBlock = coreLastBlock;
long workerLastBlock = 12;

@Mock private ReplicatesService replicatesService;
@Mock private SignatureService signatureService;
Expand Down Expand Up @@ -109,13 +107,11 @@ public void shouldNotGetReplicateSinceWorkerLastBlockNotAvailable() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(NO_TEE_TAG);

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -203,49 +199,6 @@ public void shouldNotGetAnyReplicateSinceWorkerDoesNotHaveEnoughGas() {
assertTaskAccessForNewReplicateLockNeverUsed();
}

@Test
public void shouldNotGetReplicateSinceIsNotFewBlocksAfterInitialization() {
Worker existingWorker = Worker.builder()
.id("1")
.walletAddress(WALLET_WORKER_1)
.cpuNb(2)
.teeEnabled(false)
.lastAliveDate(new Date())
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(NO_TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(initBlock + 1);//should be 2
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true);
when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true);
when(replicatesService.hasWorkerAlreadyParticipated(CHAIN_TASK_ID, WALLET_WORKER_1))
.thenReturn(false);
when(consensusService.doesTaskNeedMoreContributionsForConsensus(CHAIN_TASK_ID, runningTask.getTrust(),
runningTask.getMaxExecutionTime())).thenReturn(true);
when(smsService.getEnclaveChallenge(CHAIN_TASK_ID, false)).thenReturn(BytesUtils.EMPTY_ADDRESS);
when(signatureService.createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS))
.thenReturn(new WorkerpoolAuthorization());

Optional<WorkerpoolAuthorization> oAuthorization = replicateSupplyService.getAuthOfAvailableReplicate(workerLastBlock, WALLET_WORKER_1);

assertThat(oAuthorization).isEmpty();

Mockito.verify(replicatesService, Mockito.times(0))
.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
Mockito.verify(workerService, Mockito.times(0))
.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
assertTaskAccessForNewReplicateNotDeadLocking();
}

@Test
public void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() {
Worker existingWorker = Worker.builder()
Expand All @@ -256,14 +209,12 @@ public void shouldNotGetAnyReplicateSinceWorkerAlreadyParticipated() {
.build();

Task runningTask1 = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask1.setInitializationBlockNumber(initBlock);
runningTask1.setMaxExecutionTime(maxExecutionTime);
runningTask1.changeStatus(RUNNING);
runningTask1.setTag(NO_TEE_TAG);
runningTask1.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true);
when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true);
when(taskService.getInitializedOrRunningTasks())
Expand Down Expand Up @@ -291,13 +242,11 @@ public void shouldNotGetReplicateSinceNeedsMoreContributionsForConsensus() {

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.changeStatus(RUNNING);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.setTag(NO_TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true);
when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true);
when(taskService.getInitializedOrRunningTasks())
Expand All @@ -324,14 +273,12 @@ public void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -367,14 +314,12 @@ public void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() {
.build();

Task task1 = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
task1.setInitializationBlockNumber(initBlock);
task1.setMaxExecutionTime(maxExecutionTime);
task1.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));
task1.changeStatus(RUNNING);
task1.setTag(NO_TEE_TAG);

Task taskDeadlineReached = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
taskDeadlineReached.setInitializationBlockNumber(initBlock);
taskDeadlineReached.setMaxExecutionTime(maxExecutionTime);
taskDeadlineReached.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), -60));
taskDeadlineReached.changeStatus(RUNNING);
Expand All @@ -387,7 +332,6 @@ public void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() {
tasks.add(task1);
tasks.add(taskDeadlineReached);
when(taskService.getInitializedOrRunningTasks()).thenReturn(tasks);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
doNothing().when(contributionTimeoutTaskDetector).detect();

replicateSupplyService.getAuthOfAvailableReplicate(workerLastBlock, WALLET_WORKER_1);
Expand All @@ -408,14 +352,12 @@ public void shouldNotGetReplicateWhenTaskAlreadyAccessed() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(NO_TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(true);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -450,14 +392,12 @@ public void shouldGetReplicateWithNoTee() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(NO_TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -493,14 +433,12 @@ public void shouldGetReplicateWithTee() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.getWorker(WALLET_WORKER_1)).thenReturn(Optional.of(existingWorker));
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -536,14 +474,12 @@ public void shouldTeeNeededTaskNotBeGivenToTeeDisabledWorker() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true);
when(taskService.getInitializedOrRunningTasks())
.thenReturn(Collections.singletonList(runningTask));
Expand Down Expand Up @@ -571,14 +507,12 @@ public void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() {
.build();

Task runningTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
runningTask.setInitializationBlockNumber(initBlock);
runningTask.setMaxExecutionTime(maxExecutionTime);
runningTask.changeStatus(RUNNING);
runningTask.setTag(TEE_TAG);
runningTask.setContributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60));

when(taskService.isTaskBeingAccessedForNewReplicate(CHAIN_TASK_ID)).thenReturn(false);
when(web3jService.getLatestBlockNumber()).thenReturn(coreLastBlock);
when(workerService.canAcceptMoreWorks(WALLET_WORKER_1)).thenReturn(true);
when(web3jService.hasEnoughGas(WALLET_WORKER_1)).thenReturn(true);
when(taskService.getInitializedOrRunningTasks())
Expand Down Expand Up @@ -632,7 +566,6 @@ public void shouldNotGetInterruptedReplicateSinceEnclaveChallengeNeededButNotGen

List<String> ids = Arrays.asList(CHAIN_TASK_ID);
Task teeTask = new Task(DAPP_NAME, COMMAND_LINE, 5, CHAIN_TASK_ID);
teeTask.setInitializationBlockNumber(initBlock);
Optional<Replicate> noTeeReplicate = getStubReplicate(ReplicateStatus.COMPUTING);
teeTask.setTag(TEE_TAG);

Expand Down
30 changes: 30 additions & 0 deletions src/test/java/com/iexec/core/task/TaskServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,35 @@ public void shouldUpdateInitializing2InitailizeFailedSinceChainTaskIdIsEmpty() {
assertThat(task.getCurrentStatus()).isEqualTo(FAILED);
}

@Test
public void shouldPartiallyUpdateForReceived2Initializing2InitializedSinceNotActiveTaskOnChain() {
Task task = getStubTask();
task.changeStatus(RECEIVED);
task.setChainTaskId(CHAIN_TASK_ID);
Pair<String, ChainReceipt> pair = Pair.of(CHAIN_TASK_ID, null);

when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task));
when(iexecHubService.hasEnoughGas()).thenReturn(true);
when(iexecHubService.isTaskInUnsetStatusOnChain(CHAIN_DEAL_ID, 0)).thenReturn(true);
when(iexecHubService.isBeforeContributionDeadline(task.getChainDealId()))
.thenReturn(true);

when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.initialize(CHAIN_DEAL_ID, 0)).thenReturn(Optional.of(pair));
when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder()
.build()));

taskService.updateTaskRunnable(CHAIN_TASK_ID);
assertThat(task.getChainDealId()).isEqualTo(CHAIN_DEAL_ID);
assertThat(task.getDateStatusList().get(task.getDateStatusList().size() - 2).getStatus()).isEqualTo(RECEIVED);
assertThat(task.getDateStatusList().get(task.getDateStatusList().size() - 1).getStatus()).isEqualTo(INITIALIZING);
assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZING);

// test that double call doesn't change anything
taskService.updateTaskRunnable(CHAIN_TASK_ID);
assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZING);
}

@Test
public void shouldUpdateReceived2Initializing2Initialized() {
Task task = getStubTask();
Expand All @@ -502,6 +531,7 @@ public void shouldUpdateReceived2Initializing2Initialized() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.initialize(CHAIN_DEAL_ID, 0)).thenReturn(Optional.of(pair));
when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder()
.status(ChainTaskStatus.ACTIVE)
.contributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60).getTime())
.build()));

Expand Down

0 comments on commit 1562e09

Please sign in to comment.