Skip to content

Commit

Permalink
Merge pull request #386 from iExecBlockchainComputing/feature/task-ex…
Browse files Browse the repository at this point in the history
…ecutor

Fix websockets
  • Loading branch information
zguesmi authored Oct 22, 2020
2 parents 4c01d0f + 4422d38 commit d87d362
Show file tree
Hide file tree
Showing 36 changed files with 552 additions and 436 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ FROM openjdk:11.0.7-jre-slim

COPY build/libs/iexec-core-@projectversion@.jar iexec-core.jar

ENTRYPOINT ["java","-jar","/iexec-core.jar"]
ENTRYPOINT ["java", "-jar", "/iexec-core.jar"]
56 changes: 19 additions & 37 deletions src/main/java/com/iexec/core/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,29 @@

package com.iexec.core;

import java.util.concurrent.Executor;
import com.iexec.core.config.AsyncConfig;
import com.iexec.core.config.MongoConfig;
import com.iexec.core.config.RetryConfig;
import com.iexec.core.config.SchedulingConfig;
import com.iexec.core.config.SwaggerConfig;
import com.iexec.core.config.WebMvcConfig;
import com.iexec.core.config.WebSocketConfig;

import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.context.annotation.Import;

@Configuration
@EnableRetry
@EnableScheduling
@Import({
AsyncConfig.class,
MongoConfig.class,
RetryConfig.class,
SchedulingConfig.class,
SwaggerConfig.class,
WebMvcConfig.class,
WebSocketConfig.class,
})
@EnableFeignClients
@EnableAsync
public class AppConfig implements WebMvcConfigurer {

@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**");
}

/**
* We need this bean with the name "taskExecutor"
* to help spring choose an executor for
* {@link org.springframework.scheduling.annotation.Async}
* tasks.
* <p>
* Resolves the message:
* "More than one TaskExecutor bean found
* within the context, and none is named 'taskExecutor'.
* Mark one of them as primary or name it 'taskExecutor'
* (possibly as an alias) in order to use it for async
* processing."
*
* @return
*/
@Bean
public Executor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
public class AppConfig {

}
66 changes: 66 additions & 0 deletions src/main/java/com/iexec/core/config/AsyncConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.config;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;

import com.iexec.core.utils.TaskExecutorUtils;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;

import lombok.extern.slf4j.Slf4j;

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

/**
* This executor is used for Async tasks.
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
return TaskExecutorUtils.newThreadPoolTaskExecutor("Async-");
}

/**
* Handle uncaught exceptions raised by Async tasks.
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new Handler();
}

private class Handler implements AsyncUncaughtExceptionHandler {

@Override
public void handleUncaughtException(
Throwable ex,
Method method,
Object... params
) {
log.error("Exception in async task [method:{}, params:{}]",
method, params, ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.iexec.core;
package com.iexec.core.config;

import com.mongodb.MongoClient;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -24,7 +24,7 @@
import org.springframework.data.mongodb.gridfs.GridFsTemplate;

@Configuration
public class SpringMongoConfig extends AbstractMongoConfiguration {
public class MongoConfig extends AbstractMongoConfiguration {

@Value("${spring.data.mongodb.database}")
private String databaseName;
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/iexec/core/config/RetryConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;

@Configuration
@EnableRetry
public class RetryConfig {

}
36 changes: 36 additions & 0 deletions src/main/java/com/iexec/core/config/SchedulingConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.config;

import com.iexec.core.utils.TaskSchedulerUtils;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

@Configuration
@EnableScheduling
public class SchedulingConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(
TaskSchedulerUtils.newThreadPoolTaskScheduler("Scheduled-")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.iexec.core;
package com.iexec.core.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/iexec/core/config/WebMvcConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

package com.iexec.core.pubsub;
package com.iexec.core.config;

import com.iexec.core.utils.TaskSchedulerUtils;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
Expand All @@ -36,6 +38,7 @@ public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/connect").withSockJS()
.setWebSocketEnabled(false)
.setHeartbeatTime(5000)
;
.setTaskScheduler(TaskSchedulerUtils
.newThreadPoolTaskScheduler("STOMP-"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package com.iexec.core.detector.replicate;

import com.iexec.common.replicate.ReplicateStatusModifier;
import com.iexec.core.detector.Detector;
import com.iexec.core.replicate.Replicate;
import com.iexec.core.replicate.ReplicatesService;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskExecutorEngine;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,14 +40,13 @@ public class ReplicateResultUploadTimeoutDetector implements Detector {

private TaskService taskService;
private ReplicatesService replicatesService;
private TaskExecutorEngine taskExecutorEngine;

public ReplicateResultUploadTimeoutDetector(TaskService taskService,
ReplicatesService replicatesService,
TaskExecutorEngine taskExecutorEngine) {
public ReplicateResultUploadTimeoutDetector(
TaskService taskService,
ReplicatesService replicatesService
) {
this.taskService = taskService;
this.replicatesService = replicatesService;
this.taskExecutorEngine = taskExecutorEngine;
}

@Scheduled(fixedRateString = "${cron.detector.resultuploadtimeout.period}")
Expand Down Expand Up @@ -82,7 +79,7 @@ public void detect() {
}

if (hasReplicateAlreadyFailedToUpload) {
taskExecutorEngine.updateTask(task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
return;
}

Expand All @@ -93,15 +90,15 @@ public void detect() {
replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(),
RESULT_UPLOAD_REQUEST_FAILED);

taskExecutorEngine.updateTask(task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
return;
}

if (task.getCurrentStatus() == TaskStatus.RESULT_UPLOADING) {
replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(),
RESULT_UPLOAD_FAILED);

taskExecutorEngine.updateTask(task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.iexec.core.detector.Detector;
import com.iexec.core.task.Task;
import com.iexec.core.task.TaskExecutorEngine;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -33,12 +32,9 @@
public class ContributionTimeoutTaskDetector implements Detector {

private TaskService taskService;
private TaskExecutorEngine taskExecutorEngine;

public ContributionTimeoutTaskDetector(TaskService taskService,
TaskExecutorEngine taskExecutorEngine) {
public ContributionTimeoutTaskDetector(TaskService taskService) {
this.taskService = taskService;
this.taskExecutorEngine = taskExecutorEngine;
}

@Scheduled(fixedRateString = "${cron.detector.contribution.timeout.period}")
Expand All @@ -49,7 +45,7 @@ public void detect() {
Date now = new Date();
if (now.after(task.getContributionDeadline())) {
log.info("Task with contribution timeout found [chainTaskId:{}]", task.getChainTaskId());
taskExecutorEngine.updateTask(task.getChainTaskId());
taskService.updateTask(task.getChainTaskId());
}
}
}
Expand Down
Loading

0 comments on commit d87d362

Please sign in to comment.