diff --git a/Dockerfile b/Dockerfile index 06bb14d69..c6687229f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,4 +2,4 @@ FROM openjdk:11.0.7-jre-slim COPY build/libs/iexec-core-@projectversion@.jar iexec-core.jar -ENTRYPOINT ["java","-jar","/iexec-core.jar"] \ No newline at end of file +ENTRYPOINT ["java", "-jar", "/iexec-core.jar"] \ No newline at end of file diff --git a/src/main/java/com/iexec/core/AppConfig.java b/src/main/java/com/iexec/core/AppConfig.java index 22c7afc4f..9a72522a9 100644 --- a/src/main/java/com/iexec/core/AppConfig.java +++ b/src/main/java/com/iexec/core/AppConfig.java @@ -16,47 +16,29 @@ package com.iexec.core; -import java.util.concurrent.Executor; +import com.iexec.core.config.AsyncConfig; +import com.iexec.core.config.MongoConfig; +import com.iexec.core.config.RetryConfig; +import com.iexec.core.config.SchedulingConfig; +import com.iexec.core.config.SwaggerConfig; +import com.iexec.core.config.WebMvcConfig; +import com.iexec.core.config.WebSocketConfig; import org.springframework.cloud.openfeign.EnableFeignClients; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.task.SimpleAsyncTaskExecutor; -import org.springframework.retry.annotation.EnableRetry; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.web.servlet.config.annotation.CorsRegistry; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import org.springframework.context.annotation.Import; @Configuration -@EnableRetry -@EnableScheduling +@Import({ + AsyncConfig.class, + MongoConfig.class, + RetryConfig.class, + SchedulingConfig.class, + SwaggerConfig.class, + WebMvcConfig.class, + WebSocketConfig.class, +}) @EnableFeignClients -@EnableAsync -public class AppConfig implements WebMvcConfigurer { - - @Override - public void addCorsMappings(CorsRegistry registry) { - registry.addMapping("/**"); - } - - /** - * We need this bean with the name "taskExecutor" - * to help spring choose an executor for - * {@link org.springframework.scheduling.annotation.Async} - * tasks. - *

- * Resolves the message: - * "More than one TaskExecutor bean found - * within the context, and none is named 'taskExecutor'. - * Mark one of them as primary or name it 'taskExecutor' - * (possibly as an alias) in order to use it for async - * processing." - * - * @return - */ - @Bean - public Executor taskExecutor() { - return new SimpleAsyncTaskExecutor(); - } +public class AppConfig { + } \ No newline at end of file diff --git a/src/main/java/com/iexec/core/config/AsyncConfig.java b/src/main/java/com/iexec/core/config/AsyncConfig.java new file mode 100644 index 000000000..b3e0bc0f1 --- /dev/null +++ b/src/main/java/com/iexec/core/config/AsyncConfig.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.config; + +import java.lang.reflect.Method; +import java.util.concurrent.Executor; + +import com.iexec.core.utils.TaskExecutorUtils; + +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; + +import lombok.extern.slf4j.Slf4j; + +@Configuration +@EnableAsync +@Slf4j +public class AsyncConfig implements AsyncConfigurer { + + /** + * This executor is used for Async tasks. + * + * @return + */ + @Override + public Executor getAsyncExecutor() { + return TaskExecutorUtils.newThreadPoolTaskExecutor("Async-"); + } + + /** + * Handle uncaught exceptions raised by Async tasks. + */ + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return new Handler(); + } + + private class Handler implements AsyncUncaughtExceptionHandler { + + @Override + public void handleUncaughtException( + Throwable ex, + Method method, + Object... params + ) { + log.error("Exception in async task [method:{}, params:{}]", + method, params, ex); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/iexec/core/SpringMongoConfig.java b/src/main/java/com/iexec/core/config/MongoConfig.java similarity index 94% rename from src/main/java/com/iexec/core/SpringMongoConfig.java rename to src/main/java/com/iexec/core/config/MongoConfig.java index 956094874..3a8d302e5 100644 --- a/src/main/java/com/iexec/core/SpringMongoConfig.java +++ b/src/main/java/com/iexec/core/config/MongoConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.iexec.core; +package com.iexec.core.config; import com.mongodb.MongoClient; import org.springframework.beans.factory.annotation.Value; @@ -24,7 +24,7 @@ import org.springframework.data.mongodb.gridfs.GridFsTemplate; @Configuration -public class SpringMongoConfig extends AbstractMongoConfiguration { +public class MongoConfig extends AbstractMongoConfiguration { @Value("${spring.data.mongodb.database}") private String databaseName; diff --git a/src/main/java/com/iexec/core/config/RetryConfig.java b/src/main/java/com/iexec/core/config/RetryConfig.java new file mode 100644 index 000000000..4df015bfc --- /dev/null +++ b/src/main/java/com/iexec/core/config/RetryConfig.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.annotation.EnableRetry; + +@Configuration +@EnableRetry +public class RetryConfig { + +} diff --git a/src/main/java/com/iexec/core/config/SchedulingConfig.java b/src/main/java/com/iexec/core/config/SchedulingConfig.java new file mode 100644 index 000000000..7cedaf019 --- /dev/null +++ b/src/main/java/com/iexec/core/config/SchedulingConfig.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.config; + +import com.iexec.core.utils.TaskSchedulerUtils; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; + +@Configuration +@EnableScheduling +public class SchedulingConfig implements SchedulingConfigurer { + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + taskRegistrar.setTaskScheduler( + TaskSchedulerUtils.newThreadPoolTaskScheduler("Scheduled-") + ); + } +} diff --git a/src/main/java/com/iexec/core/SwaggerConfig.java b/src/main/java/com/iexec/core/config/SwaggerConfig.java similarity index 97% rename from src/main/java/com/iexec/core/SwaggerConfig.java rename to src/main/java/com/iexec/core/config/SwaggerConfig.java index 0d24e8ff3..aa597a798 100644 --- a/src/main/java/com/iexec/core/SwaggerConfig.java +++ b/src/main/java/com/iexec/core/config/SwaggerConfig.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.iexec.core; +package com.iexec.core.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/src/main/java/com/iexec/core/config/WebMvcConfig.java b/src/main/java/com/iexec/core/config/WebMvcConfig.java new file mode 100644 index 000000000..0684af5c2 --- /dev/null +++ b/src/main/java/com/iexec/core/config/WebMvcConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class WebMvcConfig implements WebMvcConfigurer { + + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**"); + } +} diff --git a/src/main/java/com/iexec/core/pubsub/WebSocketConfig.java b/src/main/java/com/iexec/core/config/WebSocketConfig.java similarity index 88% rename from src/main/java/com/iexec/core/pubsub/WebSocketConfig.java rename to src/main/java/com/iexec/core/config/WebSocketConfig.java index c7437b952..5568c400b 100644 --- a/src/main/java/com/iexec/core/pubsub/WebSocketConfig.java +++ b/src/main/java/com/iexec/core/config/WebSocketConfig.java @@ -14,7 +14,9 @@ * limitations under the License. */ -package com.iexec.core.pubsub; +package com.iexec.core.config; + +import com.iexec.core.utils.TaskSchedulerUtils; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; @@ -36,6 +38,7 @@ public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/connect").withSockJS() .setWebSocketEnabled(false) .setHeartbeatTime(5000) - ; + .setTaskScheduler(TaskSchedulerUtils + .newThreadPoolTaskScheduler("STOMP-")); } } \ No newline at end of file diff --git a/src/main/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetector.java b/src/main/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetector.java index 123d6c53a..3bf68473e 100644 --- a/src/main/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetector.java +++ b/src/main/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetector.java @@ -16,12 +16,10 @@ package com.iexec.core.detector.replicate; -import com.iexec.common.replicate.ReplicateStatusModifier; import com.iexec.core.detector.Detector; import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -42,14 +40,13 @@ public class ReplicateResultUploadTimeoutDetector implements Detector { private TaskService taskService; private ReplicatesService replicatesService; - private TaskExecutorEngine taskExecutorEngine; - public ReplicateResultUploadTimeoutDetector(TaskService taskService, - ReplicatesService replicatesService, - TaskExecutorEngine taskExecutorEngine) { + public ReplicateResultUploadTimeoutDetector( + TaskService taskService, + ReplicatesService replicatesService + ) { this.taskService = taskService; this.replicatesService = replicatesService; - this.taskExecutorEngine = taskExecutorEngine; } @Scheduled(fixedRateString = "${cron.detector.resultuploadtimeout.period}") @@ -82,7 +79,7 @@ public void detect() { } if (hasReplicateAlreadyFailedToUpload) { - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); return; } @@ -93,7 +90,7 @@ public void detect() { replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(), RESULT_UPLOAD_REQUEST_FAILED); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); return; } @@ -101,7 +98,7 @@ public void detect() { replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(), RESULT_UPLOAD_FAILED); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); return; } } diff --git a/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java b/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java index cf56fd8ac..cbaea53a5 100644 --- a/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetector.java @@ -18,7 +18,6 @@ import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -33,12 +32,9 @@ public class ContributionTimeoutTaskDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; - public ContributionTimeoutTaskDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine) { + public ContributionTimeoutTaskDetector(TaskService taskService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; } @Scheduled(fixedRateString = "${cron.detector.contribution.timeout.period}") @@ -49,7 +45,7 @@ public void detect() { Date now = new Date(); if (now.after(task.getContributionDeadline())) { log.info("Task with contribution timeout found [chainTaskId:{}]", task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/detector/task/FinalizedTaskDetector.java b/src/main/java/com/iexec/core/detector/task/FinalizedTaskDetector.java index e861fb001..d4398bc3e 100644 --- a/src/main/java/com/iexec/core/detector/task/FinalizedTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/FinalizedTaskDetector.java @@ -21,7 +21,6 @@ import com.iexec.core.chain.IexecHubService; import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -35,14 +34,11 @@ public class FinalizedTaskDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; private IexecHubService iexecHubService; public FinalizedTaskDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine, IexecHubService iexecHubService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; this.iexecHubService = iexecHubService; } @@ -58,7 +54,7 @@ public void detect() { if (chainTask.isPresent() && chainTask.get().getStatus().equals(ChainTaskStatus.COMPLETED)) { log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]", TaskStatus.FINALIZING, TaskStatus.FINALIZED, task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/detector/task/InitializedTaskDetector.java b/src/main/java/com/iexec/core/detector/task/InitializedTaskDetector.java index c051f6b1d..69ef66ba2 100644 --- a/src/main/java/com/iexec/core/detector/task/InitializedTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/InitializedTaskDetector.java @@ -21,7 +21,6 @@ import com.iexec.core.chain.IexecHubService; import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -35,14 +34,11 @@ public class InitializedTaskDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; private IexecHubService iexecHubService; public InitializedTaskDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine, IexecHubService iexecHubService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; this.iexecHubService = iexecHubService; } @@ -60,7 +56,7 @@ public void detect() { } log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]", task.getCurrentStatus(), TaskStatus.INITIALIZED, task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/detector/task/ReopenedTaskDetector.java b/src/main/java/com/iexec/core/detector/task/ReopenedTaskDetector.java index 61bf78191..a6f4fc779 100644 --- a/src/main/java/com/iexec/core/detector/task/ReopenedTaskDetector.java +++ b/src/main/java/com/iexec/core/detector/task/ReopenedTaskDetector.java @@ -21,7 +21,6 @@ import com.iexec.core.chain.IexecHubService; import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -35,14 +34,11 @@ public class ReopenedTaskDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; private IexecHubService iexecHubService; public ReopenedTaskDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine, IexecHubService iexecHubService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; this.iexecHubService = iexecHubService; } @@ -63,7 +59,7 @@ public void detect() { if (chainTask.getStatus().equals(ChainTaskStatus.ACTIVE)) { log.info("Detected confirmed missing update (task) [is:{}, should:{}, taskId:{}]", TaskStatus.REOPENING, TaskStatus.REOPENED, task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetector.java b/src/main/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetector.java index 2a38ba2f1..165be814a 100644 --- a/src/main/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetector.java +++ b/src/main/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetector.java @@ -18,7 +18,6 @@ import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -35,12 +34,9 @@ public class TaskResultUploadTimeoutDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; - public TaskResultUploadTimeoutDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine) { + public TaskResultUploadTimeoutDetector(TaskService taskService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; } @Scheduled(fixedRateString = "${cron.detector.resultuploadtimeout.period}") @@ -60,7 +56,7 @@ public void detect() { if (isNowAfterFinalDeadline) { log.info("found task in status {} after final deadline [chainTaskId:{}]", task.getCurrentStatus(), chainTaskId); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/detector/task/UnstartedTxDetector.java b/src/main/java/com/iexec/core/detector/task/UnstartedTxDetector.java index ef2c5841b..b89bc520d 100644 --- a/src/main/java/com/iexec/core/detector/task/UnstartedTxDetector.java +++ b/src/main/java/com/iexec/core/detector/task/UnstartedTxDetector.java @@ -18,7 +18,6 @@ import com.iexec.core.detector.Detector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import lombok.extern.slf4j.Slf4j; @@ -32,12 +31,9 @@ public class UnstartedTxDetector implements Detector { private TaskService taskService; - private TaskExecutorEngine taskExecutorEngine; - public UnstartedTxDetector(TaskService taskService, - TaskExecutorEngine taskExecutorEngine) { + public UnstartedTxDetector(TaskService taskService) { this.taskService = taskService; - this.taskExecutorEngine = taskExecutorEngine; } @Scheduled(fixedRateString = "${cron.detector.unstartedtx.period}") @@ -48,7 +44,7 @@ public void detect() { for (Task task : notYetFinalizingTasks) { log.info("Detected confirmed missing update (task) [is:{}, should:{}, chainTaskId:{}]", TaskStatus.RESULT_UPLOADED, TaskStatus.FINALIZING, task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } //start initialize when needed @@ -56,7 +52,7 @@ public void detect() { for (Task task : notYetInitializedTasks) { log.info("Detected confirmed missing update (task) [is:{}, should:{}, chainTaskId:{}]", TaskStatus.RECEIVED, TaskStatus.INITIALIZING, task.getChainTaskId()); - taskExecutorEngine.updateTask(task.getChainTaskId()); + taskService.updateTask(task.getChainTaskId()); } } } diff --git a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java index a384bbba1..c6ccbc3ed 100644 --- a/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java +++ b/src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java @@ -29,7 +29,6 @@ import com.iexec.core.contribution.ConsensusService; import com.iexec.core.sms.SmsService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.core.worker.Worker; @@ -50,7 +49,6 @@ public class ReplicateSupplyService { private ReplicatesService replicatesService; private SignatureService signatureService; - private TaskExecutorEngine taskExecutorEngine; private TaskService taskService; private WorkerService workerService; private SmsService smsService; @@ -60,7 +58,6 @@ public class ReplicateSupplyService { public ReplicateSupplyService(ReplicatesService replicatesService, SignatureService signatureService, - TaskExecutorEngine taskExecutorEngine, TaskService taskService, WorkerService workerService, SmsService smsService, @@ -69,7 +66,6 @@ public ReplicateSupplyService(ReplicatesService replicatesService, ConsensusService consensusService) { this.replicatesService = replicatesService; this.signatureService = signatureService; - this.taskExecutorEngine = taskExecutorEngine; this.taskService = taskService; this.workerService = workerService; this.smsService = smsService; @@ -138,7 +134,7 @@ Optional getAuthOfAvailableReplicate(long workerLastBlo // no need to ge further if the consensus is already reached on-chain // the task should be updated since the consensus is reached but it is still in RUNNING status if (taskService.isConsensusReached(task)) { - taskExecutorEngine.updateTask(chainTaskId); + taskService.updateTask(chainTaskId); continue; } @@ -327,7 +323,7 @@ private Optional recoverReplicateInContributionPhase(Task return Optional.of(TaskNotificationType.PLEASE_WAIT); } - taskExecutorEngine.updateTask(chainTaskId); + taskService.updateTask(chainTaskId); return Optional.of(TaskNotificationType.PLEASE_REVEAL); } @@ -364,7 +360,7 @@ private Optional recoverReplicateInRevealPhase(Task task, ReplicateStatusDetails details = new ReplicateStatusDetails(blockNumber); replicatesService.updateReplicateStatus(chainTaskId, walletAddress, REVEALED, details); - CompletableFuture completableFuture = taskExecutorEngine.updateTask(chainTaskId); + CompletableFuture completableFuture = taskService.updateTask(chainTaskId); completableFuture.join(); } @@ -421,7 +417,7 @@ private Optional recoverReplicateInResultUploadPhase(Task if (didReplicateStartUploading && didReplicateUploadWithoutNotifying) { replicatesService.updateReplicateStatus(chainTaskId, walletAddress, RESULT_UPLOADED); - taskExecutorEngine.updateTask(chainTaskId); + taskService.updateTask(chainTaskId); return Optional.of(TaskNotificationType.PLEASE_WAIT); } diff --git a/src/main/java/com/iexec/core/task/Task.java b/src/main/java/com/iexec/core/task/Task.java index 7ce46600b..29f4e1d65 100644 --- a/src/main/java/com/iexec/core/task/Task.java +++ b/src/main/java/com/iexec/core/task/Task.java @@ -27,7 +27,6 @@ import org.springframework.data.annotation.Version; import org.springframework.data.mongodb.core.index.Indexed; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Date; import java.util.List; diff --git a/src/main/java/com/iexec/core/task/TaskExecutorEngine.java b/src/main/java/com/iexec/core/task/TaskExecutorEngine.java deleted file mode 100644 index 100e8a088..000000000 --- a/src/main/java/com/iexec/core/task/TaskExecutorEngine.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2020 IEXEC BLOCKCHAIN TECH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.iexec.core.task; - -import static com.iexec.core.utils.ThreadPoolExecutorUtils.singleThreadExecutorWithFixedSizeQueue; - -import java.util.Date; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.springframework.stereotype.Service; - -import lombok.extern.slf4j.Slf4j; -import net.jodah.expiringmap.ExpirationListener; -import net.jodah.expiringmap.ExpirationPolicy; -import net.jodah.expiringmap.ExpiringMap; - -/** - * This class is used to perform updates on a task one by one. It also ensures that no extra update is - * performed for no reason (in the case of multiple replicate updates in a short time, the task update will only be called - * once) - */ -@Slf4j -@Service -public class TaskExecutorEngine { - - private final TaskService taskService; - private final TaskExecutorFactory taskExecutorFactory; - - public TaskExecutorEngine(TaskService taskService) { - this.taskService = taskService; - this.taskExecutorFactory = new TaskExecutorFactory(); - } - - /** - * Trigger task update in a separate executor. - * - * @param chainTaskId - * @return - */ - public CompletableFuture updateTask(String chainTaskId) { - Executor executor = this.taskExecutorFactory.getOrCreate(chainTaskId); - return CompletableFuture - .supplyAsync( - () -> taskService.tryUpgradeTaskStatus(chainTaskId), - executor - ) - .handle((res, err) -> { - if (err != null) { - err.printStackTrace(); - } - return res && err == null; - }); - } - - /** - * Remove the thread pool executor - * dedicated to this task. - * - * @param task - */ - public void removeTaskExecutor(Task task) { - if (!TaskStatus.isFinalStatus(task.getCurrentStatus())) { - log.error("Cannot remove executor for unfinished " + - "task [chainTaskId:{}]", task.getChainTaskId()); - return; - } - this.taskExecutorFactory.expire(task.getChainTaskId()); - } - - /** - * A factory that manages expiring thread - * executors. Each executor has its own - * expiration period. - */ - private class TaskExecutorFactory { - - // this map is thread-safe - private final ExpiringMap map = - ExpiringMap.builder() - .expirationPolicy(ExpirationPolicy.CREATED) - .variableExpiration() - // shutdown thread executor when an entry expires - .expirationListener(shutdownExecutorWhenExpired()) - .build(); - - /** - * Get a task's executor or create a new - * one if needed. - * - * @param chainTaskId id associated to executor - * @param maxTtl max time to live for this executor - * @return - */ - public Executor getOrCreate(String chainTaskId) { - if (map.containsKey(chainTaskId)) { - return map.get(chainTaskId); - } - String threadPoolName = "0x" + chainTaskId.substring(0, 7); - map.put(chainTaskId, singleThreadExecutorWithFixedSizeQueue(1, threadPoolName)); - Date deadline = taskService.getTaskFinalDeadline(chainTaskId); - map.setExpiration(chainTaskId, deadline.getTime(), TimeUnit.MILLISECONDS); - return map.get(chainTaskId); - } - - /** - * Set the expiration period of an executor - * to 0 and the ExpirationListener will do - * the rest. - * @param chainTaskId - */ - public void expire(String chainTaskId) { - map.setExpiration(chainTaskId, 0, TimeUnit.MILLISECONDS); - } - - /** - * The max TTL of an executor is the max duration - * of an iExec task in a given iExec category. - * The executor can expire before the max duration - * once the task is COMPLETED. - * - * @return a listener that shuts down the executor. - */ - private ExpirationListener shutdownExecutorWhenExpired() { - return new ExpirationListener() { - @Override - public void expired(String chainTaskId, ThreadPoolExecutor executor) { - executor.shutdown(); - } - }; - } - } -} diff --git a/src/main/java/com/iexec/core/task/TaskService.java b/src/main/java/com/iexec/core/task/TaskService.java index f275ad5f5..cde836f0d 100644 --- a/src/main/java/com/iexec/core/task/TaskService.java +++ b/src/main/java/com/iexec/core/task/TaskService.java @@ -24,6 +24,7 @@ import com.iexec.core.chain.Web3jService; import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesService; +import com.iexec.core.task.executor.TaskExecutorEngine; import com.iexec.core.task.event.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -34,6 +35,7 @@ import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -44,19 +46,26 @@ @Service public class TaskService { - private final ConcurrentHashMap taskAccessForNewReplicateLock = new ConcurrentHashMap<>(); + private final ConcurrentHashMap + taskAccessForNewReplicateLock = new ConcurrentHashMap<>(); + private TaskRepository taskRepository; + private TaskExecutorEngine taskExecutorEngine; private IexecHubService iexecHubService; private ReplicatesService replicatesService; private ApplicationEventPublisher applicationEventPublisher; private Web3jService web3jService; - public TaskService(TaskRepository taskRepository, - IexecHubService iexecHubService, - ReplicatesService replicatesService, - ApplicationEventPublisher applicationEventPublisher, - Web3jService web3jService) { + public TaskService( + TaskRepository taskRepository, + TaskExecutorEngine taskExecutorEngine, + IexecHubService iexecHubService, + ReplicatesService replicatesService, + ApplicationEventPublisher applicationEventPublisher, + Web3jService web3jService + ) { this.taskRepository = taskRepository; + this.taskExecutorEngine = taskExecutorEngine; this.iexecHubService = iexecHubService; this.replicatesService = replicatesService; this.applicationEventPublisher = applicationEventPublisher; @@ -171,10 +180,43 @@ public Date getTaskFinalDeadline(String chainTaskId) { .orElse(new Date()); } - boolean tryUpgradeTaskStatus(String chainTaskId) { + /** + * Update task asynchronously. + * + * @param chainTaskId + * @return + */ + // TODO change this mechanism of update + public CompletableFuture updateTask(String chainTaskId) { + long expiration = getTaskFinalDeadline(chainTaskId).getTime(); + return taskExecutorEngine.run( + chainTaskId, + expiration, + () -> updateTaskRunnable(chainTaskId) + ); + } + + /** + * Remove task's executor if task is + * in final status. + * + * @param task + */ + public void removeTaskExecutor(Task task) { + if (!TaskStatus.isFinalStatus(task.getCurrentStatus())) { + log.error("Cannot remove executor for unfinished " + + "task [chainTaskId:{}]", task.getChainTaskId()); + return; + } + taskExecutorEngine.removeExecutor(task.getChainTaskId()); + log.info("Removed task executor [chainTaskId:{}]", + task.getChainTaskId()); + } + + void updateTaskRunnable(String chainTaskId) { Optional optional = getTaskByChainTaskId(chainTaskId); if (!optional.isPresent()) { - return false; + return; } Task task = optional.get(); @@ -215,7 +257,6 @@ boolean tryUpgradeTaskStatus(String chainTaskId) { finalizing2Finalized2Completed(task); break; } - return true; } private Task updateTaskStatusAndSave(Task task, TaskStatus newStatus) { diff --git a/src/main/java/com/iexec/core/task/executor/TaskExecutorEngine.java b/src/main/java/com/iexec/core/task/executor/TaskExecutorEngine.java new file mode 100644 index 000000000..17c059ea6 --- /dev/null +++ b/src/main/java/com/iexec/core/task/executor/TaskExecutorEngine.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.task.executor; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.springframework.stereotype.Component; + +/** + * This class is used to perform updates on a task one by one. + * It also ensures that no extra update is performed for no reason + * (in the case of multiple replicate updates in a short time, + * the task update will only be called once) + */ +@Component +public class TaskExecutorEngine { + + private final TaskExecutorFactory taskExecutorFactory; + + public TaskExecutorEngine(TaskExecutorFactory taskExecutorFactory) { + this.taskExecutorFactory = taskExecutorFactory; + } + + /** + * Execute task update runnable in a dedicated executor. + * + * @param chainTaskId + * @return completableFuture to follow task execution. + */ + public CompletableFuture run( + String chainTaskId, long expiration, Runnable taskUpdate + ) { + Executor executor = taskExecutorFactory + .getOrCreate(chainTaskId, expiration); + return CompletableFuture.runAsync(taskUpdate, executor); + } + + /** + * Remove executor by chainTaskId. + * + * @param chainTaskId + */ + public void removeExecutor(String chainTaskId) { + taskExecutorFactory.remove(chainTaskId); + } +} diff --git a/src/main/java/com/iexec/core/task/executor/TaskExecutorFactory.java b/src/main/java/com/iexec/core/task/executor/TaskExecutorFactory.java new file mode 100644 index 000000000..0fa30ef39 --- /dev/null +++ b/src/main/java/com/iexec/core/task/executor/TaskExecutorFactory.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.task.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import com.iexec.core.utils.TaskExecutorUtils; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; +import net.jodah.expiringmap.ExpirationListener; +import net.jodah.expiringmap.ExpirationPolicy; +import net.jodah.expiringmap.ExpiringMap; + +/** + * A factory that manages expiring thread executors. + * Each executor has its own expiration period. + */ +@Component +@Slf4j +class TaskExecutorFactory { + + // this map is thread-safe + private final ExpiringMap map; + + TaskExecutorFactory() { + this.map = ExpiringMap.builder() + .expirationPolicy(ExpirationPolicy.CREATED) + .variableExpiration() + // shutdown thread executor when an entry expires + .expirationListener( + new ShutdownExecuterExpirationListener() + ).build(); + } + + /** + * Get a task's executor or create a new one if needed. + * The executor has an expiration period after which it + * will be shutdown and removed. + * + * @param chainTaskId id associated to executor + * @param maxTtl max time to live for this executor + * @return the executor + */ + ThreadPoolTaskExecutor getOrCreate(String chainTaskId, long expiration) { + if (map.containsKey(chainTaskId)) { + return map.get(chainTaskId); + } + String threadNamePrefix = chainTaskId.substring(0, 9); + map.put( + chainTaskId, + TaskExecutorUtils.singleThreadWithFixedSizeQueue(1, threadNamePrefix) + ); + map.setExpiration(chainTaskId, expiration, MILLISECONDS); + log.info("Created new task executor [chainTaskId:{}, expiration:{}]", + chainTaskId, new Date(expiration)); + return map.get(chainTaskId); + } + + /** + * Remove executor by id. + * + * @param chainTaskId + */ + void remove(String chainTaskId) { + // Set the expiration period to 0 + // and the ExpirationListener will do + // the rest. + map.setExpiration(chainTaskId, 0, TimeUnit.MILLISECONDS); + } + + /** + * An expiration listener that shutdowns the executor + * when its expiration time is reached. + */ + private class ShutdownExecuterExpirationListener + implements ExpirationListener { + + @Override + public void expired( + String chainTaskId, ThreadPoolTaskExecutor executor + ) { + executor.shutdown(); + } + } +} diff --git a/src/main/java/com/iexec/core/task/listener/ReplicateListeners.java b/src/main/java/com/iexec/core/task/listener/ReplicateListeners.java index 8323f50c0..df0d73570 100644 --- a/src/main/java/com/iexec/core/task/listener/ReplicateListeners.java +++ b/src/main/java/com/iexec/core/task/listener/ReplicateListeners.java @@ -22,7 +22,7 @@ import com.iexec.core.detector.replicate.ContributionUnnotifiedDetector; import com.iexec.core.replicate.ReplicateUpdatedEvent; import com.iexec.core.replicate.ReplicatesService; -import com.iexec.core.task.TaskExecutorEngine; +import com.iexec.core.task.TaskService; import com.iexec.core.worker.WorkerService; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -34,18 +34,18 @@ @Component public class ReplicateListeners { - private TaskExecutorEngine taskExecutorEngine; + private TaskService taskService; private WorkerService workerService; private ContributionUnnotifiedDetector contributionUnnotifiedDetector; private ReplicatesService replicatesService; - public ReplicateListeners(TaskExecutorEngine taskExecutorEngine, - WorkerService workerService, + public ReplicateListeners(WorkerService workerService, + TaskService taskService, ContributionUnnotifiedDetector contributionUnnotifiedDetector, ReplicatesService replicatesService) { - this.taskExecutorEngine = taskExecutorEngine; this.workerService = workerService; + this.taskService = taskService; this.contributionUnnotifiedDetector = contributionUnnotifiedDetector; this.replicatesService = replicatesService; } @@ -57,7 +57,7 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) { ReplicateStatus newStatus = statusUpdate.getStatus(); ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause(): null; - taskExecutorEngine.updateTask(event.getChainTaskId()); + taskService.updateTask(event.getChainTaskId()); if (newStatus.equals(ReplicateStatus.COMPUTED)) { workerService.removeComputedChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress()); diff --git a/src/main/java/com/iexec/core/task/listener/TaskListeners.java b/src/main/java/com/iexec/core/task/listener/TaskListeners.java index 0f2c99ba7..f3d9ca444 100644 --- a/src/main/java/com/iexec/core/task/listener/TaskListeners.java +++ b/src/main/java/com/iexec/core/task/listener/TaskListeners.java @@ -23,7 +23,7 @@ import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; +import com.iexec.core.task.TaskService; import com.iexec.core.task.event.*; import com.iexec.core.worker.WorkerService; import lombok.extern.slf4j.Slf4j; @@ -38,16 +38,16 @@ @Slf4j public class TaskListeners { - private TaskExecutorEngine taskExecutorEngine; + private TaskService taskService; private NotificationService notificationService; private ReplicatesService replicatesService; private WorkerService workerService; - public TaskListeners(TaskExecutorEngine taskExecutorEngine, + public TaskListeners(TaskService taskService, NotificationService notificationService, ReplicatesService replicatesService, WorkerService workerService) { - this.taskExecutorEngine = taskExecutorEngine; + this.taskService = taskService; this.notificationService = notificationService; this.replicatesService = replicatesService; this.workerService = workerService; @@ -57,7 +57,7 @@ public TaskListeners(TaskExecutorEngine taskExecutorEngine, @EventListener public void onTaskCreatedEvent(TaskCreatedEvent event) { log.info("Received TaskCreatedEvent [chainTaskId:{}]", event.getChainTaskId()); - taskExecutorEngine.updateTask(event.getChainTaskId()); + taskService.updateTask(event.getChainTaskId()); } @EventListener @@ -148,7 +148,7 @@ public void onTaskCompletedEvent(TaskCompletedEvent event) { String chainTaskId = task.getChainTaskId(); log.info("Received TaskCompletedEvent [chainTaskId:{}] ", chainTaskId); - taskExecutorEngine.removeTaskExecutor(task); + taskService.removeTaskExecutor(task); notificationService.sendTaskNotification(TaskNotification.builder() .chainTaskId(chainTaskId) diff --git a/src/main/java/com/iexec/core/utils/CustomSingleExecutorThreadFactory.java b/src/main/java/com/iexec/core/utils/CustomSingleExecutorThreadFactory.java deleted file mode 100644 index f4ee7344b..000000000 --- a/src/main/java/com/iexec/core/utils/CustomSingleExecutorThreadFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.iexec.core.utils; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Copied from {@link java.util.concurrent.Executors#defaultThreadFactory()} - * with minor changes to use a custom thread pool name. - */ -class CustomSingleExecutorThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - CustomSingleExecutorThreadFactory(String threadPoolName) { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - namePrefix = threadPoolName + "-th-"; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, - namePrefix + threadNumber.getAndIncrement(), - 0); - if (t.isDaemon()) - t.setDaemon(false); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } -} diff --git a/src/main/java/com/iexec/core/utils/TaskExecutorUtils.java b/src/main/java/com/iexec/core/utils/TaskExecutorUtils.java new file mode 100644 index 000000000..cde486f41 --- /dev/null +++ b/src/main/java/com/iexec/core/utils/TaskExecutorUtils.java @@ -0,0 +1,36 @@ +package com.iexec.core.utils; + +import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +public class TaskExecutorUtils { + + public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor( + String threadNamePrefix + ) { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setThreadNamePrefix(threadNamePrefix); + executor.initialize(); + return executor; + } + + // TODO remove this + public static ThreadPoolTaskExecutor singleThreadWithFixedSizeQueue( + int queueSize, + String threadNamePrefix + ) { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(1); + executor.setKeepAliveSeconds(0); + executor.setQueueCapacity(queueSize); + executor.setThreadNamePrefix(threadNamePrefix); + // Discard silently when we add a task + // to the already-full queue. + executor.setRejectedExecutionHandler(new DiscardPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/com/iexec/core/utils/TaskSchedulerUtils.java b/src/main/java/com/iexec/core/utils/TaskSchedulerUtils.java new file mode 100644 index 000000000..12bd4dd0c --- /dev/null +++ b/src/main/java/com/iexec/core/utils/TaskSchedulerUtils.java @@ -0,0 +1,16 @@ +package com.iexec.core.utils; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +public class TaskSchedulerUtils { + + public static ThreadPoolTaskScheduler newThreadPoolTaskScheduler( + String threadNamePrefix + ) { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + scheduler.setThreadNamePrefix(threadNamePrefix); + scheduler.initialize(); + return scheduler; + } +} diff --git a/src/main/java/com/iexec/core/utils/ThreadPoolExecutorUtils.java b/src/main/java/com/iexec/core/utils/ThreadPoolExecutorUtils.java deleted file mode 100644 index 25fb27fcc..000000000 --- a/src/main/java/com/iexec/core/utils/ThreadPoolExecutorUtils.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2020 IEXEC BLOCKCHAIN TECH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.iexec.core.utils; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class ThreadPoolExecutorUtils { - - private ThreadPoolExecutorUtils(){ - throw new UnsupportedOperationException(); - } - - public static ThreadPoolExecutor singleThreadExecutorWithFixedSizeQueue( - int queueSize, - String threadPoolName - ) { - ThreadPoolExecutor executor = singleThreadExecutorWithFixedSizeQueue(queueSize); - executor.setThreadFactory(new CustomSingleExecutorThreadFactory(threadPoolName)); - return executor; - } - - public static ThreadPoolExecutor singleThreadExecutorWithFixedSizeQueue( - int queueSize - ) { - int numThreads = 1; - ThreadPoolExecutor threadPool = new ThreadPoolExecutor( - numThreads, - numThreads, - 0L, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueSize) - ); - // By default (unfortunately) the ThreadPoolExecutor - // will throw an exception when a job is submitted - // that fills the queue To avoid this exception, an empty - // RejectedExecutionHandler (that does nothing) needs to be set - threadPool.setRejectedExecutionHandler((r, executor) -> { - // it is kept empty so no exception is thrown - }); - return threadPool; - } -} diff --git a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java index e5cb93ad3..866d50b97 100644 --- a/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java +++ b/src/test/java/com/iexec/core/chain/DealWatcherServiceTests.java @@ -31,7 +31,6 @@ import org.springframework.context.ApplicationEventPublisher; import java.math.BigInteger; -import java.util.Arrays; import java.util.Optional; import static org.assertj.core.api.Java6Assertions.assertThat; diff --git a/src/test/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetectorTests.java b/src/test/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetectorTests.java index b83a504f9..8b4a8d75f 100644 --- a/src/test/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/replicate/ReplicateResultUploadTimeoutDetectorTests.java @@ -18,11 +18,9 @@ import com.iexec.common.replicate.ReplicateStatus; import com.iexec.common.replicate.ReplicateStatusModifier; -import com.iexec.core.detector.replicate.ReplicateResultUploadTimeoutDetector; import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.core.task.TaskStatusChange; @@ -48,9 +46,11 @@ public class ReplicateResultUploadTimeoutDetectorTests { private final static String WALLET_WORKER_2 = "0x748e091bf16048cb5103E0E10F9D5a8b7fBDd861"; private final static String CHAIN_TASK_ID = "CHAIN_TASK_ID"; - @Mock private TaskService taskService; - @Mock private ReplicatesService replicatesService; - @Mock private TaskExecutorEngine taskExecutorEngine; + @Mock + private TaskService taskService; + + @Mock + private ReplicatesService replicatesService; @InjectMocks private ReplicateResultUploadTimeoutDetector timeoutDetector; @@ -118,7 +118,7 @@ public void shouldDetectOneReplicateWithResultUploadRequestedLongAgo() { Mockito.verify(replicatesService, Mockito.times(1)) .updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, RESULT_UPLOAD_REQUEST_FAILED); - Mockito.verify(taskExecutorEngine, Mockito.times(1)).updateTask(CHAIN_TASK_ID); + Mockito.verify(taskService, Mockito.times(1)).updateTask(CHAIN_TASK_ID); } @Test @@ -151,7 +151,7 @@ public void shouldDetectOneReplicateWithResultUploadingLongAgo() { Mockito.verify(replicatesService, Mockito.times(1)) .updateReplicateStatus(CHAIN_TASK_ID, WALLET_WORKER_1, RESULT_UPLOAD_FAILED); - Mockito.verify(taskExecutorEngine, Mockito.times(1)).updateTask(CHAIN_TASK_ID); + Mockito.verify(taskService, Mockito.times(1)).updateTask(CHAIN_TASK_ID); } @Test diff --git a/src/test/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetectorTests.java b/src/test/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetectorTests.java index cbf00d547..2d78eeade 100644 --- a/src/test/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/task/ContributionTimeoutTaskDetectorTests.java @@ -17,10 +17,8 @@ package com.iexec.core.detector.task; import com.iexec.common.replicate.ReplicateStatusDetails; -import com.iexec.core.detector.task.ContributionTimeoutTaskDetector; import com.iexec.core.replicate.ReplicatesService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.common.utils.DateTimeUtils; @@ -43,9 +41,6 @@ public class ContributionTimeoutTaskDetectorTests { @Mock private TaskService taskService; - @Mock - private TaskExecutorEngine taskExecutorEngine; - @Mock private ReplicatesService replicatesService; @@ -72,7 +67,7 @@ public void shouldNotDetectAnyContributionTimeout() { Mockito.verify(replicatesService, Mockito.times(0)) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); - Mockito.verify(taskExecutorEngine, Mockito.times(0)) + Mockito.verify(taskService, Mockito.times(0)) .updateTask(any()); } @@ -94,7 +89,7 @@ public void shouldNotUpdateTaskIfBeforeTimeout() { Mockito.verify(replicatesService, Mockito.times(0)) .updateReplicateStatus(any(), any(), any(), any(ReplicateStatusDetails.class)); - Mockito.verify(taskExecutorEngine, Mockito.times(0)) + Mockito.verify(taskService, Mockito.times(0)) .updateTask(any()); } @@ -112,7 +107,7 @@ public void shouldUpdateIfIsTimeout() { contributionDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(1)) + Mockito.verify(taskService, Mockito.times(1)) .updateTask(any()); } } diff --git a/src/test/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetectorTests.java b/src/test/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetectorTests.java index dc8c489a6..d3c154f4b 100644 --- a/src/test/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/task/TaskResultUploadTimeoutDetectorTests.java @@ -17,7 +17,6 @@ package com.iexec.core.detector.task; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.common.utils.DateTimeUtils; @@ -41,9 +40,6 @@ public class TaskResultUploadTimeoutDetectorTests { @Mock private TaskService taskService; - @Mock - private TaskExecutorEngine taskExecutorEngine; - @InjectMocks private TaskResultUploadTimeoutDetector taskResultUploadTimeoutDetector; @@ -71,7 +67,7 @@ public void shouldDetectResultUploadTimeout() { taskResultUploadTimeoutDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(1)).updateTask(chainTaskId); + Mockito.verify(taskService, Mockito.times(1)).updateTask(chainTaskId); } @Test @@ -92,7 +88,7 @@ public void shouldNotDetectResultUploadTimeoutSinceStillBeforeDeadline() { taskResultUploadTimeoutDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(0)).updateTask(chainTaskId); + Mockito.verify(taskService, Mockito.times(0)).updateTask(chainTaskId); } } \ No newline at end of file diff --git a/src/test/java/com/iexec/core/detector/task/UnstartedTxDetectorTests.java b/src/test/java/com/iexec/core/detector/task/UnstartedTxDetectorTests.java index efb792c53..2377a1cc8 100644 --- a/src/test/java/com/iexec/core/detector/task/UnstartedTxDetectorTests.java +++ b/src/test/java/com/iexec/core/detector/task/UnstartedTxDetectorTests.java @@ -16,9 +16,7 @@ package com.iexec.core.detector.task; -import com.iexec.core.detector.task.UnstartedTxDetector; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import org.junit.Before; import org.junit.Test; @@ -37,9 +35,6 @@ public class UnstartedTxDetectorTests { @Mock private TaskService taskService; - @Mock - private TaskExecutorEngine taskExecutorEngine; - @InjectMocks private UnstartedTxDetector unstartedTxDetector; @@ -58,7 +53,7 @@ public void shouldTryUpdateTaskFromReceived() { unstartedTxDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(1)) + Mockito.verify(taskService, Mockito.times(1)) .updateTask(task.getChainTaskId()); } @@ -72,7 +67,7 @@ public void shouldNotTryUpdateTaskFromReceived() { unstartedTxDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(0)) + Mockito.verify(taskService, Mockito.times(0)) .updateTask(task.getChainTaskId()); } @@ -86,7 +81,7 @@ public void shouldTryUpdateTaskFromResultUploaded() { unstartedTxDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(1)) + Mockito.verify(taskService, Mockito.times(1)) .updateTask(task.getChainTaskId()); } @@ -100,7 +95,7 @@ public void shouldNotTryUpdateTaskFromResultUploaded() { unstartedTxDetector.detect(); - Mockito.verify(taskExecutorEngine, Mockito.times(0)) + Mockito.verify(taskService, Mockito.times(0)) .updateTask(task.getChainTaskId()); } diff --git a/src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java b/src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java index e53374a21..c1671e712 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java @@ -19,7 +19,7 @@ import com.iexec.common.replicate.ReplicateStatus; import com.iexec.common.replicate.ReplicateStatusUpdate; import com.iexec.core.detector.replicate.ContributionUnnotifiedDetector; -import com.iexec.core.task.TaskExecutorEngine; +import com.iexec.core.task.TaskService; import com.iexec.core.task.listener.ReplicateListeners; import com.iexec.core.worker.WorkerService; import org.junit.Before; @@ -41,7 +41,7 @@ public class ReplicateListenersTests { private final static String WORKER_WALLET = "0xwallet1"; @Mock - private TaskExecutorEngine taskExecutorEngine; + private TaskService taskService; @Mock private WorkerService workerService; @Mock @@ -72,7 +72,7 @@ public void shoulUpdateTaskOnReplicateUpdate() { replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent); } - Mockito.verify(taskExecutorEngine, Mockito.times(someStatuses.size())).updateTask(any()); + Mockito.verify(taskService, Mockito.times(someStatuses.size())).updateTask(any()); } @Test diff --git a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java index fc1d001bd..e84c1d9f9 100644 --- a/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java +++ b/src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java @@ -30,7 +30,6 @@ import com.iexec.core.contribution.ConsensusService; import com.iexec.core.sms.SmsService; import com.iexec.core.task.Task; -import com.iexec.core.task.TaskExecutorEngine; import com.iexec.core.task.TaskService; import com.iexec.core.task.TaskStatus; import com.iexec.common.utils.DateTimeUtils; @@ -72,7 +71,6 @@ public class ReplicateSupplyServiceTests { @Mock private ReplicatesService replicatesService; @Mock private SignatureService signatureService; - @Mock private TaskExecutorEngine taskExecutorEngine; @Mock private TaskService taskService; @Mock private WorkerService workerService; @Mock private SmsService smsService; @@ -899,9 +897,9 @@ public void shouldTellReplicateToWaitSinceRevealed() { when(replicatesService.didReplicateRevealOnchain(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(true); - CompletableFuture future = new CompletableFuture<>(); - when(taskExecutorEngine.updateTask(CHAIN_TASK_ID)).thenReturn(future); - future.complete(true); + CompletableFuture future = new CompletableFuture<>(); + when(taskService.updateTask(CHAIN_TASK_ID)).thenReturn(future); + future.complete(null); List missedTaskNotifications = replicateSupplyService.getMissedTaskNotifications(blockNumber, WALLET_WORKER_1); @@ -938,9 +936,9 @@ public void shouldTellReplicateToUploadResultSinceRequestedAfterRevealing() { when(replicatesService.didReplicateRevealOnchain(CHAIN_TASK_ID, WALLET_WORKER_1)) .thenReturn(true); - CompletableFuture future = new CompletableFuture<>(); - when(taskExecutorEngine.updateTask(CHAIN_TASK_ID)).thenReturn(future); - future.complete(true); + CompletableFuture future = new CompletableFuture<>(); + when(taskService.updateTask(CHAIN_TASK_ID)).thenReturn(future); + future.complete(null); List missedTaskNotifications = replicateSupplyService.getMissedTaskNotifications(blockNumber, WALLET_WORKER_1); diff --git a/src/test/java/com/iexec/core/task/TaskServiceTests.java b/src/test/java/com/iexec/core/task/TaskServiceTests.java index 765d2da3b..2ff894506 100644 --- a/src/test/java/com/iexec/core/task/TaskServiceTests.java +++ b/src/test/java/com/iexec/core/task/TaskServiceTests.java @@ -26,6 +26,7 @@ import com.iexec.core.detector.replicate.RevealTimeoutDetector; import com.iexec.core.replicate.Replicate; import com.iexec.core.replicate.ReplicatesService; +import com.iexec.core.task.executor.TaskExecutorEngine; import com.iexec.common.utils.DateTimeUtils; import com.iexec.core.worker.WorkerService; import org.apache.commons.lang3.tuple.Pair; @@ -57,12 +58,14 @@ public class TaskServiceTests { private final static String COMMAND_LINE = "commandLine"; private final long maxExecutionTime = 60000; private final static String NO_TEE_TAG = BytesUtils.EMPTY_HEXASTRING_64; - private final static String TEE_TAG = "0x0000000000000000000000000000000000000000000000000000000000000001"; private final static String RESULT_LINK = "/ipfs/the_result_string"; @Mock private TaskRepository taskRepository; + @Mock + private TaskExecutorEngine taskExecutorEngine; + @Mock private WorkerService workerService; @@ -323,7 +326,7 @@ public void shouldNotUpdateReceived2InitializingSinceChainTaskIdIsNotEmpty() { Task task = new Task(DAPP_NAME, COMMAND_LINE, 2, CHAIN_TASK_ID); task.changeStatus(RECEIVED); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(RECEIVED); } @@ -346,7 +349,7 @@ public void shouldNotUpdateReceived2InitializingSinceNoEnoughGas() { when(iexecHubService.initialize(CHAIN_DEAL_ID, 1)).thenReturn(Optional.of(pair)); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder().build())); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(RECEIVED); } @@ -366,7 +369,7 @@ public void shouldNotUpdateReceived2InitializingSinceTaskNotInUnsetStatusOnChain when(iexecHubService.initialize(CHAIN_DEAL_ID, 1)).thenReturn(Optional.of(pair)); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder().build())); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(RECEIVED); } @@ -386,7 +389,7 @@ public void shouldNotUpdateReceived2InitializingSinceAfterContributionDeadline() when(iexecHubService.initialize(CHAIN_DEAL_ID, 1)).thenReturn(Optional.of(pair)); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(ChainTask.builder().build())); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(RECEIVED); } @@ -405,7 +408,7 @@ public void shouldUpdateInitializing2InitailizeFailedSinceChainTaskIdIsEmpty() { when(taskRepository.save(task)).thenReturn(task); when(iexecHubService.initialize(CHAIN_DEAL_ID, 1)).thenReturn(Optional.of(pair)); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getLastButOneStatus()).isEqualTo(INITIALIZE_FAILED); assertThat(task.getCurrentStatus()).isEqualTo(FAILED); @@ -428,7 +431,7 @@ public void shouldNotUpdateInitializing2InitailizedSinceNoChainTaskReturned() { when(iexecHubService.initialize(CHAIN_DEAL_ID, 1)).thenReturn(Optional.of(pair)); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.empty()); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZING); } @@ -451,7 +454,7 @@ public void shouldUpdateReceived2Initializing2Initialized() { .contributionDeadline(DateTimeUtils.addMinutesToDate(new Date(), 60).getTime()) .build())); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getChainDealId()).isEqualTo(CHAIN_DEAL_ID); assertThat(task.getDateStatusList().get(task.getDateStatusList().size() - 3).getStatus()).isEqualTo(RECEIVED); assertThat(task.getDateStatusList().get(task.getDateStatusList().size() - 2).getStatus()).isEqualTo(INITIALIZING); @@ -459,7 +462,7 @@ public void shouldUpdateReceived2Initializing2Initialized() { assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); // test that double call doesn't change anything - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); } @@ -475,7 +478,7 @@ public void shouldUpdateInitialized2Running() { // 1 RUNNING out of 2 when(taskRepository.save(task)).thenReturn(task); when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(RUNNING); } @@ -488,7 +491,7 @@ public void shouldNotUpdateInitialized2RunningSinceNoRunningOrComputedReplicates when(replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.COMPUTED)).thenReturn(0); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); assertThat(task.getCurrentStatus()).isNotEqualTo(RUNNING); } @@ -502,7 +505,7 @@ public void shouldNotUpdateInitialized2RunningSinceComputedIsMoreThanNeeded() { when(replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.COMPUTED)).thenReturn(4); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); } @@ -524,7 +527,7 @@ public void shouldNotUpdateInitializedOrRunning2ContributionTimeoutSinceBeforeTi when(iexecHubService.getChainTask(task.getChainTaskId())).thenReturn(Optional.of(chainTask)); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); assertThat(task.getCurrentStatus()).isNotEqualTo(CONTRIBUTION_TIMEOUT); } @@ -545,7 +548,7 @@ public void shouldNotUpdateInitializedOrRunning2ContributionTimeoutSinceChainTas when(iexecHubService.getChainTask(task.getChainTaskId())).thenReturn(Optional.of(chainTask)); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); assertThat(task.getCurrentStatus()).isNotEqualTo(CONTRIBUTION_TIMEOUT); } @@ -566,7 +569,7 @@ public void shouldNotReSendNotificationWhenAlreadyInContributionTimeout() { when(iexecHubService.getChainTask(task.getChainTaskId())).thenReturn(Optional.of(chainTask)); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); Mockito.verify(applicationEventPublisher, Mockito.times(0)) .publishEvent(any()); } @@ -589,7 +592,7 @@ public void shouldUpdateFromInitializedOrRunning2ContributionTimeout() { when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); when(iexecHubService.getChainTask(CHAIN_TASK_ID)).thenReturn(Optional.of(chainTask)); - taskService.tryUpgradeTaskStatus(CHAIN_TASK_ID); + taskService.updateTaskRunnable(CHAIN_TASK_ID); assertThat(task.getCurrentStatus()).isEqualTo(FAILED); assertThat(task.getLastButOneStatus()).isEqualTo(CONTRIBUTION_TIMEOUT); @@ -614,7 +617,7 @@ public void shouldUpdateRunning2ConsensusReached() { when(iexecHubService.getConsensusBlock(anyString(), anyLong())).thenReturn(ChainReceipt.builder().blockNumber(1L).build()); doNothing().when(applicationEventPublisher).publishEvent(any()); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED); } @@ -630,7 +633,7 @@ public void shouldNotUpdateRunning2ConsensusReachedSinceWrongTaskStatus() { when(replicatesService.getNbOffChainReplicatesWithStatus(task.getChainTaskId(), ReplicateStatus.CONTRIBUTED)).thenReturn(2); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(INITIALIZED); } @@ -641,7 +644,7 @@ public void shouldNotUpdateRunning2ConsensusReachedSinceCannotGetChainTask() { when(iexecHubService.getChainTask(task.getChainTaskId())).thenReturn(Optional.empty()); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(RUNNING); } @@ -657,7 +660,7 @@ public void shouldNOTUpdateRunning2ConsensusReachedSinceOnChainStatusNotRevealin when(replicatesService.getNbOffChainReplicatesWithStatus(task.getChainTaskId(), ReplicateStatus.CONTRIBUTED)).thenReturn(2); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(RUNNING); } @@ -673,7 +676,7 @@ public void shouldNOTUpdateRunning2ConsensusReachedSinceWinnerContributorsDiffer when(replicatesService.getNbOffChainReplicatesWithStatus(task.getChainTaskId(), ReplicateStatus.CONTRIBUTED)).thenReturn(1); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(RUNNING); } @@ -692,7 +695,7 @@ public void shouldUpdateConsensusReached2AtLeastOneReveal2UploadRequested() { when(replicatesService.getRandomReplicateWithRevealStatus(task.getChainTaskId())).thenReturn(Optional.of(replicate)); doNothing().when(applicationEventPublisher).publishEvent(any()); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getUploadingWorkerWalletAddress()).isEqualTo(replicate.getWalletAddress()); int size = task.getDateStatusList().size(); assertThat(task.getDateStatusList().get(size - 2).getStatus()).isEqualTo(AT_LEAST_ONE_REVEALED); @@ -709,7 +712,7 @@ public void shouldNOTUpdateConsensusReached2AtLeastOneRevealSinceNoRevealedRepli when(replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.REVEALED)).thenReturn(0); when(taskRepository.save(task)).thenReturn(task); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(CONSENSUS_REACHED); } @@ -722,7 +725,7 @@ public void shouldUpdateFromUploadRequestedToUploadingResult() { when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADING)).thenReturn(1); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RESULT_UPLOADING); } @@ -734,14 +737,14 @@ public void shouldNotUpdateFromUploadRequestedToUploadingResultSinceNoWorkerUplo when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.RESULT_UPLOADING)).thenReturn(0); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RESULT_UPLOAD_REQUESTED); // check that the request upload method has been called Mockito.verify(replicatesService, Mockito.times(1)) .getRandomReplicateWithRevealStatus(any()); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RESULT_UPLOAD_REQUESTED); } @@ -770,7 +773,7 @@ public void shouldUpdateResultUploading2Uploaded2Finalizing2Finalized() { //one .revealCounter(1) .build())); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); TaskStatus lastButOneStatus = task.getDateStatusList().get(task.getDateStatusList().size() - 2).getStatus(); TaskStatus lastButTwoStatus = task.getDateStatusList().get(task.getDateStatusList().size() - 3).getStatus(); @@ -795,7 +798,7 @@ public void shouldUpdateResultUploading2UploadedButNot2Finalizing() { //one work when(iexecHubService.canFinalize(task.getChainTaskId())).thenReturn(true); when(iexecHubService.hasEnoughGas()).thenReturn(false); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(RESULT_UPLOADED); assertThat(task.getCurrentStatus()).isNotEqualTo(FINALIZING); @@ -820,7 +823,7 @@ public void shouldUpdateResultUploading2Uploaded2Finalizing2FinalizeFail() { //o .revealCounter(1) .build())); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); TaskStatus lastButTwoStatus = task.getDateStatusList().get(task.getDateStatusList().size() - 3).getStatus(); TaskStatus lastButThreeStatus = task.getDateStatusList().get(task.getDateStatusList().size() - 4).getStatus(); @@ -847,7 +850,7 @@ public void shouldUpdateResultUploading2UploadedFailAndRequestUploadAgain() { when(replicatesService.getRandomReplicateWithRevealStatus(task.getChainTaskId())).thenReturn(Optional.of(replicate)); doNothing().when(applicationEventPublisher).publishEvent(any()); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getUploadingWorkerWalletAddress()).isEqualTo(replicate.getWalletAddress()); int size = task.getDateStatusList().size(); assertThat(task.getDateStatusList().get(size - 2).getStatus()).isEqualTo(RESULT_UPLOADING); @@ -911,7 +914,7 @@ public void shouldUpdateTaskToRunningFromWorkersInRunning() { when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.STARTING, ReplicateStatus.COMPUTED)).thenReturn(3); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.COMPUTED)).thenReturn(0); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RUNNING); } @@ -925,7 +928,7 @@ public void shouldUpdateTaskToRunningFromWorkersInRunningAndComputed() { when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.STARTING, ReplicateStatus.COMPUTED)).thenReturn(4); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.COMPUTED)).thenReturn(2); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RUNNING); } @@ -936,7 +939,7 @@ public void shouldNotUpdateToRunningSinceAllReplicatesInCreated() { when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.STARTING, ReplicateStatus.COMPUTED)).thenReturn(0); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.COMPUTED)).thenReturn(0); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isNotEqualTo(TaskStatus.RUNNING); } @@ -948,7 +951,7 @@ public void shouldNotUpdateToRunningCase2() { when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.STARTING, ReplicateStatus.COMPUTED)).thenReturn(2); when(replicatesService.getNbReplicatesWithCurrentStatus(CHAIN_TASK_ID, ReplicateStatus.COMPUTED)).thenReturn(2); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isNotEqualTo(TaskStatus.RUNNING); } @@ -967,7 +970,7 @@ public void shouldUpdateFromUploadingResultToResultUploaded() { when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); when(replicatesService.getReplicateWithResultUploadedStatus(CHAIN_TASK_ID)).thenReturn(Optional.of(replicate)); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(TaskStatus.RESULT_UPLOADED); assertThat(task.getResultLink()).isEqualTo(RESULT_LINK); } @@ -983,7 +986,7 @@ public void shouldNotUpdateToResultUploaded() { when(taskRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(task)); when(replicatesService.getNbReplicatesWithCurrentStatus(task.getChainTaskId(), ReplicateStatus.RESULT_UPLOADED)).thenReturn(0); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isNotEqualTo(TaskStatus.RESULT_UPLOADED); assertThat(task.getCurrentStatus()).isNotEqualTo(TaskStatus.COMPLETED); } @@ -1002,7 +1005,7 @@ public void shouldNotUpdateFromResultUploadedToFinalizingSinceNotEnoughGas() { when(replicatesService.getNbReplicatesContainingStatus(task.getChainTaskId(), ReplicateStatus.REVEALED)).thenReturn(1); when(iexecHubService.hasEnoughGas()).thenReturn(false); - taskService.tryUpgradeTaskStatus(task.getChainTaskId()); + taskService.updateTaskRunnable(task.getChainTaskId()); assertThat(task.getCurrentStatus()).isEqualTo(RESULT_UPLOADED); } }