Skip to content

Commit

Permalink
Merge pull request #593 from iExecBlockchainComputing/feature/contrib…
Browse files Browse the repository at this point in the history
…ute-and-finalize-done-tasks-detector

Add ContributeAndFinalize task detector
  • Loading branch information
mcornaton authored Jun 2, 2023
2 parents b6a0426 + 0f1b713 commit 0c82563
Show file tree
Hide file tree
Showing 3 changed files with 352 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file.
- Add `running2Finalized2Completed` in `TaskUpdateManager`. (#577 #578)
- Disable `contributeAndFinalize` with CallBack. (#579 #581)
- Add purge cached task descriptions ability. (#587)
- Add `ContributionAndFinalizationUnnotifiedDetector`. (#590)
- Add detectors for `ContributeAndFinalize` flow. (#590 #593)
### Bug Fixes
- Prevent race condition on replicate update. (#568)
- Use builders in test classes. (#589)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package com.iexec.core.detector.task;

import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.commons.poco.chain.ChainTask;
import com.iexec.commons.poco.chain.ChainTaskStatus;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.detector.Detector;
import com.iexec.core.replicate.Replicate;
import com.iexec.core.replicate.ReplicatesService;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
Expand All @@ -28,6 +31,7 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

@Slf4j
Expand All @@ -37,30 +41,68 @@ public class FinalizedTaskDetector implements Detector {
private final TaskService taskService;
private final TaskUpdateRequestManager taskUpdateRequestManager;
private final IexecHubService iexecHubService;
private final ReplicatesService replicatesService;

public FinalizedTaskDetector(TaskService taskService,
TaskUpdateRequestManager taskUpdateRequestManager,
IexecHubService iexecHubService) {
IexecHubService iexecHubService,
ReplicatesService replicatesService) {
this.taskService = taskService;
this.taskUpdateRequestManager = taskUpdateRequestManager;
this.iexecHubService = iexecHubService;
this.replicatesService = replicatesService;
}

/**
* Detector to detect tasks that are finalizing but are not finalized yet.
*/
@Scheduled(fixedRateString = "#{@cronConfiguration.getFinalize()}")
@Override
public void detect() {
detectFinalizedTasks();
detectContributeAndFinalizeDoneTasks();
}

/**
* Detect tasks that are finalizing but are not finalized yet.
*/
void detectFinalizedTasks() {
log.debug("Trying to detect finalized tasks");
for (Task task : taskService.findByCurrentStatus(TaskStatus.FINALIZING)) {
Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
if (chainTask.isPresent() && chainTask.get().getStatus().equals(ChainTaskStatus.COMPLETED)) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
task.getCurrentStatus(), TaskStatus.FINALIZED, task.getChainTaskId());
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
taskService.findByCurrentStatus(TaskStatus.FINALIZING)
.stream()
.filter(this::isChainTaskCompleted)
.forEach(this::publishTaskUpdateRequest);
}

/**
* Detect tasks that are contributed and finalized by worker
* but are not off-chain finalized yet.
*/
void detectContributeAndFinalizeDoneTasks() {
log.debug("Trying to detect contributed and finalized tasks");
taskService.findByCurrentStatus(TaskStatus.RUNNING)
.stream()
.filter(this::isTaskContributeAndFinalizeDone)
.forEach(this::publishTaskUpdateRequest);
}

boolean isChainTaskCompleted(Task task) {
final Optional<ChainTask> chainTask = iexecHubService.getChainTask(task.getChainTaskId());
return chainTask.isPresent() && chainTask.get().getStatus() == ChainTaskStatus.COMPLETED;
}

boolean isTaskContributeAndFinalizeDone(Task task) {
// Only TEE tasks can follow ContributeAndFinalize workflow
if (!task.isTeeTask()) {
return false;
}

final List<Replicate> replicates = replicatesService.getReplicates(task.getChainTaskId());
return replicates.size() == 1
&& replicates.get(0).containsStatus(ReplicateStatus.CONTRIBUTE_AND_FINALIZE_DONE)
&& isChainTaskCompleted(task);
}
}

void publishTaskUpdateRequest(Task task) {
log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]",
task.getCurrentStatus(), TaskStatus.FINALIZED, task.getChainTaskId());
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
}
Loading

0 comments on commit 0c82563

Please sign in to comment.