diff --git a/CHANGELOG.md b/CHANGELOG.md index 4baf17492..08d80aa3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file. - Always provide a `WorkerpoolAuthorization` to a worker during its recovery. (#674) - Move task metrics from `TaskUpdateManager` to `TaskService`. (#676) - Fail fast when tasks are detected past their contribution or final deadline. (#677) +- Mitigate potential race conditions by enforcing `currentStatus` value when updating a task. (#681) ### Quality diff --git a/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java b/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java index 4b3e3fd4d..1ed94aabe 100644 --- a/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java @@ -55,7 +55,7 @@ public void detect() { .push("dateStatusList").each( TaskStatusChange.builder().status(TaskStatus.CONTRIBUTION_TIMEOUT).build(), TaskStatusChange.builder().status(TaskStatus.FAILED).build()); - taskService.failMultipleTasksByQuery(update, query) + taskService.updateMultipleTasksByQuery(query, update) .forEach(id -> applicationEventPublisher.publishEvent(new ContributionTimeoutEvent(id))); } } diff --git a/src/main/java/com/iexec/core/detector/task/FinalDeadlineTaskDetector.java b/src/main/java/com/iexec/core/detector/task/FinalDeadlineTaskDetector.java index d7f5dd53a..5ec24ee7e 100644 --- a/src/main/java/com/iexec/core/detector/task/FinalDeadlineTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/FinalDeadlineTaskDetector.java @@ -53,7 +53,7 @@ public void detect() { .push("dateStatusList").each( TaskStatusChange.builder().status(TaskStatus.FINAL_DEADLINE_REACHED).build(), TaskStatusChange.builder().status(TaskStatus.FAILED).build()); - taskService.failMultipleTasksByQuery(update, query) + taskService.updateMultipleTasksByQuery(query, update) .forEach(id -> applicationEventPublisher.publishEvent(new TaskFailedEvent(id))); } } diff --git a/src/main/java/com/iexec/core/task/Task.java b/src/main/java/com/iexec/core/task/Task.java index b3cc72205..f8046818d 100644 --- a/src/main/java/com/iexec/core/task/Task.java +++ b/src/main/java/com/iexec/core/task/Task.java @@ -54,8 +54,10 @@ unique = true) public class Task { + public static final String CHAIN_TASK_ID_FIELD_NAME = "chainTaskId"; public static final String CURRENT_STATUS_FIELD_NAME = "currentStatus"; public static final String CONTRIBUTION_DEADLINE_FIELD_NAME = "contributionDeadline"; + public static final String DATE_STATUS_LIST_FIELD_NAME = "dateStatusList"; @Id private String id; diff --git a/src/main/java/com/iexec/core/task/TaskRepository.java b/src/main/java/com/iexec/core/task/TaskRepository.java index c4fb4395f..ea8af14c6 100644 --- a/src/main/java/com/iexec/core/task/TaskRepository.java +++ b/src/main/java/com/iexec/core/task/TaskRepository.java @@ -36,9 +36,6 @@ public interface TaskRepository extends MongoRepository { @Query("{ 'currentStatus': {$in: ?0} }") List findByCurrentStatus(List statuses); - @Query("{ 'currentStatus': {$in: ?0} }") - List findByCurrentStatus(List statuses, Sort sort); - /** * Retrieves the prioritized task matching with given criteria: *
    diff --git a/src/main/java/com/iexec/core/task/TaskService.java b/src/main/java/com/iexec/core/task/TaskService.java index 425f958a1..c8b862977 100644 --- a/src/main/java/com/iexec/core/task/TaskService.java +++ b/src/main/java/com/iexec/core/task/TaskService.java @@ -27,6 +27,7 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import lombok.extern.slf4j.Slf4j; +import org.bson.Document; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.dao.DuplicateKeyException; @@ -45,13 +46,13 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static com.iexec.core.task.Task.*; import static com.iexec.core.task.TaskStatus.*; @Slf4j @Service public class TaskService { - private static final String CHAIN_TASK_ID_FIELD = "chainTaskId"; public static final String METRIC_TASKS_STATUSES_COUNT = "iexec.core.tasks.count"; private final MongoTemplate mongoTemplate; private final TaskRepository taskRepository; @@ -106,21 +107,19 @@ private void initializeCurrentTaskStatusesCount() { } /** - * Save task in database if it does not - * already exist. + * Save task in database if it does not already exist. * - * @param chainDealId - * @param taskIndex - * @param dealBlockNumber - * @param imageName - * @param commandLine - * @param trust - * @param maxExecutionTime - * @param tag - * @param contributionDeadline - * @param finalDeadline - * @return optional containing the saved - * task, {@link Optional#empty()} otherwise. + * @param chainDealId on-chain deal id + * @param taskIndex task index in deal + * @param dealBlockNumber block number when orders were matched to produce the current deal + * @param imageName OCI image to use for replicates computation + * @param commandLine command that will be executed during replicates computation + * @param trust trust level, impacts replication + * @param maxExecutionTime execution time + * @param tag deal tag describing additional features like TEE framework + * @param contributionDeadline date after which a worker cannot contribute + * @param finalDeadline date after which a task cannot be updated + * @return {@code Optional} containing the saved task, {@link Optional#empty()} otherwise. */ public Optional addTask( String chainDealId, @@ -134,40 +133,73 @@ public Optional addTask( Date contributionDeadline, Date finalDeadline ) { - Task newTask = new Task(chainDealId, taskIndex, imageName, - commandLine, trust, maxExecutionTime, tag); + Task newTask = new Task(chainDealId, taskIndex, imageName, commandLine, trust, maxExecutionTime, tag); newTask.setDealBlockNumber(dealBlockNumber); newTask.setFinalDeadline(finalDeadline); newTask.setContributionDeadline(contributionDeadline); + final String taskLogDetails = String.format("chainDealId:%s, taskIndex:%s, imageName:%s, commandLine:%s, trust:%s, contributionDeadline:%s, finalDeadline:%s", + chainDealId, taskIndex, imageName, commandLine, trust, contributionDeadline, finalDeadline); try { newTask = taskRepository.save(newTask); - log.info("Added new task [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}, chainTaskId:{}]", - chainDealId, taskIndex, imageName, commandLine, trust, newTask.getChainTaskId()); + log.info("Added new task [{}}, chainTaskId:{}]", taskLogDetails, newTask.getChainTaskId()); return Optional.of(newTask); } catch (DuplicateKeyException e) { - log.info("Task already added [chainDealId:{}, taskIndex:{}, imageName:{}, commandLine:{}, trust:{}]", - chainDealId, taskIndex, imageName, commandLine, trust); + log.info("Task already added [{}]", taskLogDetails); return Optional.empty(); } } - public long updateTaskStatus(Task task, TaskStatus currentStatus, List statusChanges) { - Update update = Update.update("currentStatus", task.getCurrentStatus()); - update.push("dateStatusList").each(statusChanges); - UpdateResult result = mongoTemplate.updateFirst( - Query.query(Criteria.where(CHAIN_TASK_ID_FIELD).is(task.getChainTaskId())), - update, - Task.class); - log.debug("Updated chainTaskId [chainTaskId:{}, result:{}]", task.getChainTaskId(), result); - updateMetricsAfterStatusUpdate(currentStatus, task.getCurrentStatus()); + /** + * Updates the status of a single task in the collection + * + * @param chainTaskId On-chain ID of the task to update + * @param currentStatus Expected {@code currentStatus} of the task when executing the update + * @param targetStatus Wished {@code currentStatus} the task should be updated to + * @param statusChanges List of {@code TaskStatusChange} to append to the {@code dateStatusList} field + * @return The number of updated documents in the task collection, should be {@literal 0} or {@literal 1} due to task ID uniqueness + */ + public long updateTaskStatus(String chainTaskId, TaskStatus currentStatus, TaskStatus targetStatus, List statusChanges) { + final Update update = Update.update(CURRENT_STATUS_FIELD_NAME, targetStatus) + .push(DATE_STATUS_LIST_FIELD_NAME).each(statusChanges); + final UpdateResult result = updateTask(chainTaskId, currentStatus, update); return result.getModifiedCount(); } - public void updateTask(String chainTaskId, Update update) { - UpdateResult result = mongoTemplate.updateFirst( - Query.query(Criteria.where(CHAIN_TASK_ID_FIELD).is(chainTaskId)), - update, Task.class); - log.debug("Updated chainTaskId [chainTaskId:{}, result{}]", chainTaskId, result); + /** + * Update a single task in the collection + * + * @param chainTaskId On-chain ID of the task to update + * @param currentStatus Expected {@code currentStatus} of the task when executing the update + * @param update Update to execute on the task if criteria are respected + * @return The result of the update execution on the task collection + */ + public UpdateResult updateTask(String chainTaskId, TaskStatus currentStatus, Update update) { + final Criteria criteria = Criteria.where(CHAIN_TASK_ID_FIELD_NAME).is(chainTaskId) + .and(CURRENT_STATUS_FIELD_NAME).is(currentStatus); + // chainTaskId and currentStatus are part of the criteria, no need to add them explicitly + log.debug("Update request [criteria:{}, update:{}]", + criteria.getCriteriaObject(), update.getUpdateObject()); + final UpdateResult result = mongoTemplate.updateFirst(Query.query(criteria), update, Task.class); + log.debug("Update execution result [chainTaskId:{}, result:{}]", chainTaskId, result); + if (result.getModifiedCount() == 0L) { + log.warn("The task was not updated [chainTaskId:{}]", chainTaskId); + } else if (isTaskCurrentStatusUpdated(update)) { + // A single document has been updated (chainTaskId uniqueness) and the currentStatus has been modified + updateMetricsAfterStatusUpdate(currentStatus, update.getUpdateObject().get("$set", Document.class) + .get(CURRENT_STATUS_FIELD_NAME, TaskStatus.class)); + } + return result; + } + + /** + * Checks if provided MongoDB update has modified the {@code currentStatus} field of the task + * + * @param update The MongoDB request to check + * @return {@literal true} if the {@code currentStatus} was updated, {@literal false} otherwise + */ + private boolean isTaskCurrentStatusUpdated(Update update) { + return update.getUpdateObject().containsKey("$set") + && update.getUpdateObject().get("$set", Document.class).containsKey(CURRENT_STATUS_FIELD_NAME); } public Optional getTaskByChainTaskId(String chainTaskId) { @@ -214,8 +246,8 @@ public Optional getPrioritizedInitializedOrRunningTask( Arrays.asList(INITIALIZED, RUNNING), excludedTags, excludedChainTaskIds, - Sort.by(Sort.Order.desc(Task.CURRENT_STATUS_FIELD_NAME), - Sort.Order.asc(Task.CONTRIBUTION_DEADLINE_FIELD_NAME))); + Sort.by(Sort.Order.desc(CURRENT_STATUS_FIELD_NAME), + Sort.Order.asc(CONTRIBUTION_DEADLINE_FIELD_NAME))); } /** @@ -249,23 +281,29 @@ private Optional findPrioritizedTask(List statuses, ); } - public List failMultipleTasksByQuery(Update update, Query query) { + /** + * Updates task on a given MongoDB query. + * + * @param query The query to perform to lookup for tasks in the collection + * @param update The update to execute on the tasks returned by the query + * @return The list of modified chain task ids + */ + public List updateMultipleTasksByQuery(Query query, Update update) { return mongoTemplate.find(query, Task.class).stream() - .map(task -> failSingleTask(update, task)) + .map(task -> updateSingleTask(task, update)) .collect(Collectors.toList()); } - private String failSingleTask(Update update, Task task) { - final TaskStatus beforeUpdate = task.getCurrentStatus(); - final UpdateResult updateResult = mongoTemplate.updateFirst( - Query.query(Criteria.where(CHAIN_TASK_ID_FIELD).is(task.getChainTaskId())), update, Task.class); - if (updateResult.getModifiedCount() == 0) { - log.warn("The task was not updated [chainTaskId:{}]", task.getChainTaskId()); - return ""; - } else { - updateMetricsAfterStatusUpdate(beforeUpdate, FAILED); - return task.getChainDealId(); - } + /** + * Updates a single task in the task collection + * + * @param task The task to update + * @param update The update to perform + * @return The chain task id of the task + */ + private String updateSingleTask(Task task, Update update) { + final UpdateResult updateResult = updateTask(task.getChainTaskId(), task.getCurrentStatus(), update); + return updateResult.getModifiedCount() == 0L ? "" : task.getChainTaskId(); } public List getChainTaskIdsOfTasksExpiredBefore(Date expirationDate) { @@ -325,6 +363,7 @@ public long countByCurrentStatus(TaskStatus status) { } void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) { + log.debug("updateMetricsAfterStatusUpdate [prev:{}, next:{}]", previousStatus, newStatus); currentTaskStatusesCount.get(previousStatus).decrementAndGet(); currentTaskStatusesCount.get(newStatus).incrementAndGet(); publishTaskStatusesCountUpdate(); diff --git a/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java b/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java index ee568752b..32c3e4c5a 100644 --- a/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java +++ b/src/main/java/com/iexec/core/task/update/TaskUpdateManager.java @@ -162,17 +162,18 @@ void updateTask(String chainTaskId) { * @param statuses List of statuses to append to the task {@code dateStatusList} */ void updateTaskStatusesAndSave(Task task, TaskStatus... statuses) { - TaskStatus currentStatus = task.getCurrentStatus(); - List statusChanges = new ArrayList<>(); + final TaskStatus currentStatus = task.getCurrentStatus(); + final List statusChanges = new ArrayList<>(); for (TaskStatus newStatus : statuses) { log.info("Create TaskStatusChange succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]", task.getChainTaskId(), task.getCurrentStatus(), newStatus); final TaskStatusChange statusChange = TaskStatusChange.builder().status(newStatus).build(); + // task update required by tests task.setCurrentStatus(newStatus); task.getDateStatusList().add(statusChange); statusChanges.add(statusChange); } - saveTask(task, currentStatus, statusChanges); + saveTask(task.getChainTaskId(), currentStatus, statuses[statuses.length - 1], statusChanges); } void updateTaskStatusAndSave(Task task, TaskStatus newStatus) { @@ -180,29 +181,31 @@ void updateTaskStatusAndSave(Task task, TaskStatus newStatus) { } void updateTaskStatusAndSave(Task task, TaskStatus newStatus, ChainReceipt chainReceipt) { - TaskStatus currentStatus = task.getCurrentStatus(); - TaskStatusChange statusChange = TaskStatusChange.builder().status(newStatus).chainReceipt(chainReceipt).build(); + final TaskStatus currentStatus = task.getCurrentStatus(); + final TaskStatusChange statusChange = TaskStatusChange.builder().status(newStatus).chainReceipt(chainReceipt).build(); + // task update required by tests task.setCurrentStatus(newStatus); task.getDateStatusList().add(statusChange); - saveTask(task, currentStatus, List.of(statusChange)); + saveTask(task.getChainTaskId(), currentStatus, newStatus, List.of(statusChange)); } /** * Saves the task to the database. * - * @param task The task + * @param chainTaskId ID of the task * @param currentStatus The current status in database + * @param wishedStatus The status the task should have after the update * @param statusChanges List of changes */ - void saveTask(Task task, TaskStatus currentStatus, List statusChanges) { - long updatedTaskCount = taskService.updateTaskStatus(task, currentStatus, statusChanges); + void saveTask(String chainTaskId, TaskStatus currentStatus, TaskStatus wishedStatus, List statusChanges) { + long updatedTaskCount = taskService.updateTaskStatus(chainTaskId, currentStatus, wishedStatus, statusChanges); // `savedTask.isPresent()` should always be true if the task exists in the repository. if (updatedTaskCount != 0L) { log.info("UpdateTaskStatus succeeded [chainTaskId:{}, currentStatus:{}, newStatus:{}]", - task.getChainTaskId(), currentStatus, task.getCurrentStatus()); + chainTaskId, currentStatus, wishedStatus); } else { log.warn("UpdateTaskStatus failed. Chain Task is probably unknown [chainTaskId:{}, currentStatus:{}, wishedStatus:{}]", - task.getChainTaskId(), currentStatus, task.getCurrentStatus()); + chainTaskId, currentStatus, wishedStatus); } } // endregion @@ -244,7 +247,7 @@ void received2Initializing(Task task) { } task.setEnclaveChallenge(enclaveChallenge.get()); update.set("enclaveChallenge", enclaveChallenge.get()); - taskService.updateTask(task.getChainTaskId(), update); + taskService.updateTask(task.getChainTaskId(), task.getCurrentStatus(), update); blockchainAdapterService .requestInitialize(task.getChainDealId(), task.getTaskIndex()) @@ -392,7 +395,7 @@ private void running2ConsensusReached(ChainTask chainTask, Task task, Replicates task.setConsensus(chainTask.getConsensusValue()); long consensusBlockNumber = iexecHubService.getConsensusBlock(chainTaskId, task.getInitializationBlockNumber()).getBlockNumber(); task.setConsensusReachedBlockNumber(consensusBlockNumber); - taskService.updateTask(task.getChainTaskId(), + taskService.updateTask(task.getChainTaskId(), task.getCurrentStatus(), Update.update("revealDeadline", task.getRevealDeadline()) .set("consensus", task.getConsensus()) .set("consensusReachedBlockNumber", task.getConsensusReachedBlockNumber())); @@ -570,7 +573,7 @@ void resultUploading2Uploaded(ChainTask chainTask, Task task) { if (uploadedReplicate != null) { task.setResultLink(uploadedReplicate.getResultLink()); task.setChainCallbackData(uploadedReplicate.getChainCallbackData()); - taskService.updateTask(task.getChainTaskId(), + taskService.updateTask(task.getChainTaskId(), task.getCurrentStatus(), Update.update("resultLink", uploadedReplicate.getResultLink()) .set("chainCallbackData", uploadedReplicate.getChainCallbackData())); updateTaskStatusAndSave(task, RESULT_UPLOADED); @@ -616,7 +619,8 @@ void requestUpload(Task task) { replicatesService.getRandomReplicateWithRevealStatus(task.getChainTaskId()).ifPresent(replicate -> { // save in the task the workerWallet that is in charge of uploading the result task.setUploadingWorkerWalletAddress(replicate.getWalletAddress()); - taskService.updateTask(task.getChainTaskId(), Update.update("uploadingWorkerWalletAddress", replicate.getWalletAddress())); + taskService.updateTask(task.getChainTaskId(), task.getCurrentStatus(), + Update.update("uploadingWorkerWalletAddress", replicate.getWalletAddress())); updateTaskStatusAndSave(task, RESULT_UPLOADING); replicatesService.updateReplicateStatus( task.getChainTaskId(), replicate.getWalletAddress(), diff --git a/src/test/java/com/iexec/core/task/TaskRepositoryTest.java b/src/test/java/com/iexec/core/task/TaskRepositoryTest.java index 53ad58c0c..8534b38bb 100644 --- a/src/test/java/com/iexec/core/task/TaskRepositoryTest.java +++ b/src/test/java/com/iexec/core/task/TaskRepositoryTest.java @@ -6,7 +6,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.dao.DuplicateKeyException; -import org.springframework.data.domain.Sort; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.MongoDBContainer; @@ -14,21 +13,15 @@ import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; +import java.util.Arrays; +import java.util.List; -import static com.iexec.core.task.TaskStatus.INITIALIZED; -import static com.iexec.core.task.TaskStatus.RUNNING; import static com.iexec.core.task.TaskTestsUtils.getStubTask; @DataMongoTest @Testcontainers class TaskRepositoryTest { - private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray(); - private static final Random generator = new Random(); - @Container private static final MongoDBContainer mongoDBContainer = new MongoDBContainer(DockerImageName.parse(System.getProperty("mongo.image"))); @@ -46,22 +39,6 @@ void init() { taskRepository.deleteAll(); } - private String generateHexId() { - int length = 64; - StringBuilder sb = new StringBuilder("0x"); - for (int j = 0; j < length; j++) { - sb.append(HEX_ARRAY[generator.nextInt(HEX_ARRAY.length)]); - } - return sb.toString(); - } - - private List queryTasksOrderedByStatusThenContributionDeadline() { - return taskRepository.findByCurrentStatus(Arrays.asList(INITIALIZED, RUNNING), - Sort.by(Sort.Order.desc(Task.CURRENT_STATUS_FIELD_NAME), - Sort.Order.asc(Task.CONTRIBUTION_DEADLINE_FIELD_NAME)) - ); - } - @Test void shouldFailWithDuplicateUniqueDealIdx() { final Task task1 = getStubTask(); @@ -86,63 +63,4 @@ void shouldFailWithDuplicateChainTaskId() { .hasMessageContainingAll("E11000", "duplicate key error collection", "chainTaskId dup key"); } - @Test - void shouldFindTasksOrderedByCurrentStatusAndContributionDeadline() { - final Task task1 = getStubTask(); - task1.setChainTaskId(generateHexId()); - task1.setChainDealId(generateHexId()); - task1.setCurrentStatus(RUNNING); - task1.setContributionDeadline(Date.from(Instant.now().plus(20, ChronoUnit.MINUTES))); - - final Task task2 = getStubTask(); - task2.setChainDealId(generateHexId()); - task2.setChainTaskId(generateHexId()); - task2.setCurrentStatus(INITIALIZED); - task2.setContributionDeadline(Date.from(Instant.now().plus(20, ChronoUnit.MINUTES))); - - final Task task3 = getStubTask(); - task3.setChainDealId(generateHexId()); - task3.setChainTaskId(generateHexId()); - task3.setCurrentStatus(INITIALIZED); - task3.setContributionDeadline(Date.from(Instant.now().plus(10, ChronoUnit.MINUTES))); - - final Task task4 = getStubTask(); - task4.setChainDealId(generateHexId()); - task4.setChainTaskId(generateHexId()); - task4.setCurrentStatus(RUNNING); - task4.setContributionDeadline(Date.from(Instant.now().plus(10, ChronoUnit.MINUTES))); - - taskRepository.saveAll(Arrays.asList(task1, task2, task3, task4)); - - List foundTasks = queryTasksOrderedByStatusThenContributionDeadline(); - Assertions.assertThat(foundTasks).hasSize(4); - Assertions.assertThat(foundTasks.remove(0)).usingRecursiveComparison().isEqualTo(task4); - Assertions.assertThat(foundTasks.remove(0)).usingRecursiveComparison().isEqualTo(task1); - Assertions.assertThat(foundTasks.remove(0)).usingRecursiveComparison().isEqualTo(task3); - Assertions.assertThat(foundTasks.remove(0)).usingRecursiveComparison().isEqualTo(task2); - } - - @Test - void shouldFindTasksOrderedByCurrentStatusAndContributionDeadlineWithFuzzyData() { - List tasks = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - final Task task = getStubTask(); - task.setChainDealId(generateHexId()); - task.setChainTaskId(generateHexId()); - task.setCurrentStatus(generator.nextInt(50) % 2 == 0 ? RUNNING : INITIALIZED); - int amountToAdd = generator.nextInt(10); - task.setContributionDeadline(Date.from(Instant.now().plus(amountToAdd, ChronoUnit.MINUTES))); - tasks.add(task); - } - taskRepository.saveAll(tasks); - tasks.sort(Comparator.comparing(Task::getCurrentStatus, Comparator.reverseOrder()) - .thenComparing(Task::getContributionDeadline)); - - List foundTasks = queryTasksOrderedByStatusThenContributionDeadline(); - Assertions.assertThat(foundTasks).hasSize((int) taskRepository.count()); - for (Task task : tasks) { - Assertions.assertThat(task).usingRecursiveComparison().isEqualTo(foundTasks.remove(0)); - } - } - } diff --git a/src/test/java/com/iexec/core/task/TaskServiceTests.java b/src/test/java/com/iexec/core/task/TaskServiceTests.java index 91d2bd182..5cba63ca8 100644 --- a/src/test/java/com/iexec/core/task/TaskServiceTests.java +++ b/src/test/java/com/iexec/core/task/TaskServiceTests.java @@ -29,11 +29,14 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.context.ApplicationEventPublisher; import org.springframework.dao.DuplicateKeyException; import org.springframework.data.mongodb.core.MongoTemplate; @@ -64,6 +67,7 @@ @DataMongoTest @Testcontainers +@ExtendWith(OutputCaptureExtension.class) class TaskServiceTests { private final long maxExecutionTime = 60000; private final Date contributionDeadline = new Date(); @@ -195,6 +199,27 @@ void shouldNotAddTask() { } // endregion + // region updateTask & updateTaskStatus + @Test + void shouldNotUpdateTaskStatusAndEmitWarning(CapturedOutput output) { + final Task task = getStubTask(); + taskRepository.save(task); + final long modifiedCount = taskService.updateTaskStatus(CHAIN_TASK_ID, INITIALIZING, INITIALIZING, + List.of(TaskStatusChange.builder().status(INITIALIZING).build())); + assertThat(modifiedCount).isZero(); + assertThat(output.getOut()).contains("The task was not updated [chainTaskId:" + CHAIN_TASK_ID + "]"); + } + + @Test + void shouldUpdateTaskStatus() { + final Task task = getStubTask(INITIALIZING); + taskRepository.save(task); + final long modifiedCount = taskService.updateTaskStatus(CHAIN_TASK_ID, INITIALIZING, INITIALIZED, + List.of(TaskStatusChange.builder().status(INITIALIZED).build())); + assertThat(modifiedCount).isOne(); + } + // endregion + // region findByCurrentStatus @Test void shouldFindByCurrentStatus() { diff --git a/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java b/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java index 1da8cb27d..742017ce6 100644 --- a/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java +++ b/src/test/java/com/iexec/core/task/update/TaskUpdateManagerTest.java @@ -27,9 +27,6 @@ import com.iexec.commons.poco.tee.TeeUtils; import com.iexec.commons.poco.utils.BytesUtils; import com.iexec.core.chain.IexecHubService; -import com.iexec.core.chain.Web3jService; -import com.iexec.core.configuration.ResultRepositoryConfiguration; -import com.iexec.core.detector.replicate.RevealTimeoutDetector; import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesList; import com.iexec.core.replicate.ReplicatesService; @@ -85,24 +82,12 @@ class TaskUpdateManagerTest { @Mock private ApplicationEventPublisher applicationEventPublisher; - @Mock - private ResultRepositoryConfiguration resulRepositoryConfig; - - @Mock - private Web3jService web3jService; - - @Mock - private RevealTimeoutDetector revealTimeoutDetector; - @Mock private BlockchainAdapterService blockchainAdapterService; @Mock private TaskService taskService; - @Mock - private TaskUpdateRequestManager taskUpdateRequestManager; - @Mock private SmsService smsService; @@ -394,7 +379,7 @@ void shouldUpdateReceived2Initializing2InitializedOnStandard() { assertThat(task.getEnclaveChallenge()).isEqualTo(BytesUtils.EMPTY_ADDRESS); assertThat(task.getSmsUrl()).isNull(); verify(smsService, times(0)).getVerifiedSmsUrl(anyString(), anyString()); - verify(taskService, times(2)).updateTaskStatus(any(), any(), any()); // INITIALIZING & INITIALIZED + verify(taskService, times(2)).updateTaskStatus(any(), any(), any(), any()); // INITIALIZING & INITIALIZED } @Test @@ -428,7 +413,7 @@ void shouldUpdateReceived2Initializing2InitializedOnTee() { assertThat(task.getEnclaveChallenge()).isEqualTo(BytesUtils.EMPTY_ADDRESS); assertThat(task.getSmsUrl()).isEqualTo(smsUrl); verify(smsService, times(1)).getVerifiedSmsUrl(CHAIN_TASK_ID, tag); - verify(taskService, times(2)).updateTaskStatus(any(), any(), any()); // INITIALIZING & INITIALIZED + verify(taskService, times(2)).updateTaskStatus(any(), any(), any(), any()); // INITIALIZING & INITIALIZED } @Test @@ -462,7 +447,7 @@ void shouldNotUpdateReceived2Initializing2InitializedOnTeeSinceCannotRetrieveSms assertThat(task.getSmsUrl()).isNull(); verify(smsService, times(1)).getVerifiedSmsUrl(CHAIN_TASK_ID, tag); verify(smsService, times(0)).getEnclaveChallenge(anyString(), anyString()); - verify(taskService, times(1)).updateTaskStatus(any(), any(), any()); // INITIALIZE_FAILED & FAILED + verify(taskService, times(1)).updateTaskStatus(any(), any(), any(), any()); // INITIALIZE_FAILED & FAILED } // endregion @@ -731,7 +716,6 @@ void shouldUpdateRunning2ConsensusReached() { .winnerCounter(2) .build())); when(taskService.getTaskByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); - when(web3jService.getLatestBlockNumber()).thenReturn(2L); when(taskService.isConsensusReached(any())).thenReturn(true); when(iexecHubService.getConsensusBlock(anyString(), anyLong())).thenReturn(ChainReceipt.builder().blockNumber(1L).build()); when(replicatesList.getNbValidContributedWinners(any())).thenReturn(2); @@ -1168,7 +1152,6 @@ void shouldUpdateResultUploading2Uploaded2Finalizing2Finalized() { //one worker when(iexecHubService.hasEnoughGas()).thenReturn(true); when(blockchainAdapterService.requestFinalize(any(), any(), any())).thenReturn(Optional.of(CHAIN_TASK_ID)); when(blockchainAdapterService.isFinalized(any())).thenReturn(Optional.of(true)); - when(resulRepositoryConfig.getResultRepositoryURL()).thenReturn("http://foo:bar"); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder() .status(ChainTaskStatus.COMPLETED) .revealCounter(1) @@ -1829,16 +1812,6 @@ void shouldNotRequestUploadSinceUploadInProgress() { // endregion - // region publishRequest - - @Test - void shouldTriggerUpdateTaskAsynchronously() { - taskUpdateRequestManager.publishRequest(CHAIN_TASK_ID); - verify(taskUpdateRequestManager).publishRequest(CHAIN_TASK_ID); - } - - // endregion - // region utils private void mockTaskDescriptionFromTask(Task task) { final TaskDescription taskDescription = TaskDescription.builder()