Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add running2Finalized2Completed in TaskUpdateManager #577

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file.
### New Features
- Add ContributeAndFinalize to `ReplicateWorkflow`. (#574)
- Add check for ContributeAndFinalize in `ReplicatesService`. (#576)
- Add `running2Finalized2Completed` in `TaskUpdateManager`. (#577)
### Bug Fixes
- Prevent race condition on replicate update. (#568)
### Quality
Expand Down
52 changes: 46 additions & 6 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

@Service
@Slf4j
class TaskUpdateManager {
class TaskUpdateManager {
private final TaskService taskService;
private final IexecHubService iexecHubService;
private final ReplicatesService replicatesService;
Expand Down Expand Up @@ -82,7 +82,7 @@ void updateTask(String chainTaskId) {

boolean isFinalDeadlinePossible =
!TaskStatus.getStatusesWhereFinalDeadlineIsImpossible().contains(currentStatus);
if (isFinalDeadlinePossible && new Date().after(task.getFinalDeadline())){
if (isFinalDeadlinePossible && new Date().after(task.getFinalDeadline())) {
updateTaskStatusAndSave(task, FINAL_DEADLINE_REACHED);
// Eventually should fire a "final deadline reached" notification to worker,
// but here let's just trigger an toFailed(task) leading to a failed status
Expand Down Expand Up @@ -175,7 +175,7 @@ Task updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chain
return savedTask.get();
} else {
log.warn("UpdateTaskStatus failed. Chain Task is probably unknown." +
" [chainTaskId:{}, currentStatus:{}, wishedStatus:{}]",
" [chainTaskId:{}, currentStatus:{}, wishedStatus:{}]",
task.getChainTaskId(), currentStatus, newStatus);
return null;
}
Expand Down Expand Up @@ -204,7 +204,7 @@ void received2Initializing(Task task) {

if (task.isTeeTask()) {
Optional<String> smsUrl = smsService.getVerifiedSmsUrl(task.getChainTaskId(), task.getTag());
if(smsUrl.isEmpty()){
if (smsUrl.isEmpty()) {
log.error("Couldn't get verified SMS url [chainTaskId: {}]", task.getChainTaskId());
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
updateTaskStatusAndSave(task, FAILED);
Expand Down Expand Up @@ -241,7 +241,7 @@ void received2Initializing(Task task) {
}

void initializing2Initialized(Task task) {
if (!INITIALIZING.equals(task.getCurrentStatus())){
if (!INITIALIZING.equals(task.getCurrentStatus())) {
return;
}
// TODO: the block where initialization happened can be found
Expand All @@ -262,7 +262,7 @@ void initializing2Initialized(Task task) {
updateTaskStatusAndSave(task, INITIALIZE_FAILED);
updateTaskStatusAndSave(task, FAILED);
}, () -> log.error("Unable to check initialization on blockchain " +
"(likely too long), should use a detector [chainTaskId:{}]",
"(likely too long), should use a detector [chainTaskId:{}]",
task.getChainTaskId()));
}

Expand Down Expand Up @@ -326,6 +326,46 @@ void running2ConsensusReached(Task task) {
}
}

void running2Finalized2Completed(Task task) {
boolean isTaskInRunningStatus = task.getCurrentStatus().equals(RUNNING);
final String chainTaskId = task.getChainTaskId();
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);

if (!isTaskInRunningStatus) {
log.error("Can't transition task to `Finalized` or `Completed` when task is not `Running` " +
" [chainTaskId:{}]", chainTaskId);
return;
}

if (oReplicatesList.isEmpty()) {
log.error("Can't transition task to `Finalized` or `Completed` when no replicatesList exists" +
" [chainTaskId:{}]", chainTaskId);
return;
}

if (!task.isTeeTask()) {
log.debug("Task not running in a TEE, flow running2Finalized2Completed is not possible"
+ " [chainTaskId:{}]", chainTaskId);
return;
}
final ReplicatesList replicates = oReplicatesList.get();
final int nbReplicatesWithContributeAndFinalizeStatus = replicates.getNbReplicatesWithCurrentStatus(ReplicateStatus.CONTRIBUTE_AND_FINALIZE_DONE);

if (nbReplicatesWithContributeAndFinalizeStatus == 0) {
log.debug("No replicate in ContributeAndFinalize status"
+ " [chainTaskId:{}]", chainTaskId);
return;
} else if (nbReplicatesWithContributeAndFinalizeStatus > 1) {
log.error("Too many replicates in ContributeAndFinalize status"
+ " [chainTaskId:{}, nbReplicates:{}]", chainTaskId, nbReplicatesWithContributeAndFinalizeStatus);
toFailed(task);
return;
}

updateTaskStatusAndSave(task, FINALIZED);
finalizedToCompleted(task);
}

void initializedOrRunning2ContributionTimeout(Task task) {
boolean isInitializedOrRunningTask = task.getCurrentStatus().equals(INITIALIZED) ||
task.getCurrentStatus().equals(RUNNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,88 @@ void shouldUpdateFromInitializedOrRunning2ContributionTimeout() {
assertThat(task.getLastButOneStatus()).isEqualTo(CONTRIBUTION_TIMEOUT);
}

// Tests on running2Finalized2Completed transition

@Test
void shouldUpdateRunning2Finalized2Completed() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(RUNNING);
task.setTag(TeeUtils.TEE_SCONE_ONLY_TAG);

final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(CHAIN_TASK_ID));

when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(replicatesList.getNbReplicatesWithCurrentStatus(ReplicateStatus.CONTRIBUTE_AND_FINALIZE_DONE)).thenReturn(1);
when(taskService.updateTask(task)).thenReturn(Optional.of(task));
doNothing().when(applicationEventPublisher).publishEvent(any());

taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getLastButOneStatus()).isEqualTo(FINALIZED);
assertThat(task.getCurrentStatus()).isEqualTo(COMPLETED);
}

@Test
void shouldNotUpdateRunning2Finalized2CompletedWhenTaskNotRunning() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(INITIALIZED);

taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED);
}

@Test
void shouldNotUpdateRunning2Finalized2CompletedWhenNoReplicates() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(RUNNING);

when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.empty());

taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getCurrentStatus()).isEqualTo(RUNNING);
}

@Test
void shouldNotUpdateRunning2Finalized2CompletedWhenTaskIsNotTee() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(RUNNING);
task.setTag(NO_TEE_TAG);
final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(CHAIN_TASK_ID));

when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getCurrentStatus()).isEqualTo(RUNNING);
}

@Test
void shouldNotUpdateRunning2Finalized2CompletedWhenNoReplicatesOnContributeAndFinalizeStatus() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(RUNNING);
task.setTag(TeeUtils.TEE_SCONE_ONLY_TAG);

final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(CHAIN_TASK_ID));

when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(replicatesList.getNbReplicatesWithCurrentStatus(ReplicateStatus.CONTRIBUTE_AND_FINALIZE_DONE)).thenReturn(0);

taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getCurrentStatus()).isEqualTo(RUNNING);
}

@Test
void shouldNotUpdateRunning2Finalized2CompletedWhenMoreThanOneReplicatesOnContributeAndFinalizeStatus() {
Task task = getStubTask(maxExecutionTime);
task.changeStatus(RUNNING);
task.setTag(TeeUtils.TEE_SCONE_ONLY_TAG);

final ReplicatesList replicatesList = Mockito.spy(new ReplicatesList(CHAIN_TASK_ID));

when(replicatesService.getReplicatesList(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
when(replicatesList.getNbReplicatesWithCurrentStatus(ReplicateStatus.CONTRIBUTE_AND_FINALIZE_DONE)).thenReturn(2);
doNothing().when(applicationEventPublisher).publishEvent(any());

taskUpdateManager.running2Finalized2Completed(task);
assertThat(task.getCurrentStatus()).isEqualTo(FAILED);
}

// Tests on running2ConsensusReached transition

Expand Down