Skip to content

Commit

Permalink
Merge pull request #302 from iExecBlockchainComputing/reopen
Browse files Browse the repository at this point in the history
Reopening detector
  • Loading branch information
Ugo Plouviez authored Jun 14, 2019
2 parents 46dc2e7 + b51dbd3 commit dd263b6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.iexec.core.detector.task;

import com.iexec.common.chain.ChainTask;
import com.iexec.common.chain.ChainTaskStatus;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.detector.Detector;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskExecutorEngine;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Optional;

@Slf4j
@Service
public class ReopenedTaskDetector implements Detector {

private TaskService taskService;
private TaskExecutorEngine taskExecutorEngine;
private IexecHubService iexecHubService;

public ReopenedTaskDetector(TaskService taskService,
TaskExecutorEngine taskExecutorEngine,
IexecHubService iexecHubService) {
this.taskService = taskService;
this.taskExecutorEngine = taskExecutorEngine;
this.iexecHubService = iexecHubService;
}

/**
* Detector to detect tasks that are reopening but are not reopened yet.
*/
@Scheduled(fixedRateString = "${detector.task.finalized.unnotified.period}")
@Override
public void detect() {
log.info("Trying to detect reopened tasks");
for (Task task : taskService.findByCurrentStatus(TaskStatus.REOPENING)) {
Optional<ChainTask> oChainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (!oChainTask.isPresent()){
continue;
}

ChainTask chainTask = oChainTask.get();
if (chainTask.getStatus().equals(ChainTaskStatus.ACTIVE)) {
taskExecutorEngine.updateTask(task.getChainTaskId());
}
}
}
}

35 changes: 27 additions & 8 deletions src/main/java/com/iexec/core/task/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ boolean tryUpgradeTaskStatus(String chainTaskId) {
break;
case CONSENSUS_REACHED:
consensusReached2AtLeastOneReveal2UploadRequested(task);
consensusReached2Reopened(task);
consensusReached2Reopening(task);
break;
case REOPENING:
reopening2Reopened(task);
break;
case RESULT_UPLOAD_REQUESTED:
uploadRequested2UploadingResult(task);
Expand Down Expand Up @@ -210,6 +213,7 @@ private void received2Initialized(Task task) {
}

private void initializing2Initialized(Task task) {
// TODO: the block where initialization happened can be found
initializing2Initialized(task, null);
}

Expand Down Expand Up @@ -296,7 +300,7 @@ private void consensusReached2AtLeastOneReveal2UploadRequested(Task task) {
}
}

public void consensusReached2Reopened(Task task) {
public void consensusReached2Reopening(Task task) {
Date now = new Date();

boolean isConsensusReachedStatus = task.getCurrentStatus().equals(CONSENSUS_REACHED);
Expand All @@ -318,17 +322,32 @@ public void consensusReached2Reopened(Task task) {
Optional<ChainReceipt> optionalChainReceipt = iexecHubService.reOpen(task.getChainTaskId());

if (!optionalChainReceipt.isPresent()) {
log.error("Reopen failed [chainTaskId:{}, canReopen:{}, hasEnoughGas:{}]",
task.getChainTaskId(), canReopen, hasEnoughGas);
log.error("Reopen failed [chainTaskId:{}]", task.getChainTaskId());
updateTaskStatusAndSave(task, REOPEN_FAILED);
updateTaskStatusAndSave(task, FAILED);
return;
}
reopening2Reopened(task, optionalChainReceipt.get());
}

public void reopening2Reopened(Task task) {
reopening2Reopened(task, null);
}

task.setConsensus(null);
task.setRevealDeadline(new Date(0));
updateTaskStatusAndSave(task, REOPENED, optionalChainReceipt.get());
updateTaskStatusAndSave(task, INITIALIZED, optionalChainReceipt.get());
public void reopening2Reopened(Task task, ChainReceipt chainReceipt) {
Optional<ChainTask> oChainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (!oChainTask.isPresent()) {
return;
}
ChainTask chainTask = oChainTask.get();

// re-initialize the task if it has been reopened
if (chainTask.getStatus().equals(ChainTaskStatus.ACTIVE)) {
task.setConsensus(null);
task.setRevealDeadline(new Date(0));
updateTaskStatusAndSave(task, REOPENED, chainReceipt);
updateTaskStatusAndSave(task, INITIALIZED, chainReceipt);
}
}

private void uploadRequested2UploadingResult(Task task) {
Expand Down
19 changes: 11 additions & 8 deletions src/test/java/com/iexec/core/task/TaskServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void shouldNotFindByCurrentStatusList() {
assertThat(foundTasks).isEmpty();
}

// Tests on consensusReached2Reopened transition
// Tests on consensusReached2Reopening transition

@Test
public void shouldNotUpgrade2ReopenedSinceCurrentStatusWrong() {
Expand All @@ -183,7 +183,7 @@ public void shouldNotUpgrade2ReopenedSinceCurrentStatusWrong() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getCurrentStatus()).isEqualTo(RECEIVED);
}
Expand All @@ -200,7 +200,7 @@ public void shouldNotUpgrade2ReopenedSinceNotAfterRevealDeadline() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED);
}
Expand All @@ -217,7 +217,7 @@ public void shouldNotUpgrade2ReopenedSinceNotWeHaveSomeRevealed() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED);
}
Expand All @@ -234,7 +234,7 @@ public void shouldNotUpgrade2ReopenedSinceCantReopenOnChain() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED);
}
Expand All @@ -251,7 +251,7 @@ public void shouldNotUpgrade2ReopenedSinceNotEnoughtGas() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED);
}
Expand All @@ -269,7 +269,7 @@ public void shouldNotUpgrade2ReopenedBut2ReopendedFailedSinceTxFailed() {
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.empty());

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getLastButOneStatus()).isEqualTo(REOPEN_FAILED);
assertThat(task.getCurrentStatus()).isEqualTo(FAILED);
Expand All @@ -286,8 +286,11 @@ public void shouldUpgrade2Reopened() {
when(iexecHubService.hasEnoughGas()).thenReturn(true);
when(taskRepository.save(task)).thenReturn(task);
when(iexecHubService.reOpen(task.getChainTaskId())).thenReturn(Optional.of(new ChainReceipt()));
when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder()
.status(ChainTaskStatus.ACTIVE)
.build()));

taskService.consensusReached2Reopened(task);
taskService.consensusReached2Reopening(task);

assertThat(task.getDateStatusList().get(0).getStatus()).isEqualTo(RECEIVED);
assertThat(task.getDateStatusList().get(1).getStatus()).isEqualTo(CONSENSUS_REACHED);
Expand Down

0 comments on commit dd263b6

Please sign in to comment.