Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick #4182 to 1.3.4-prepare #4210

Merged
merged 1 commit into from
Dec 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ public interface TaskExecutionContextCacheManager {

/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
void removeByTaskInstanceId(Integer taskInstanceId);

/**
* If the value for the specified key is present and non-null,then perform the update,otherwise it will return false
*
* @param taskExecutionContext taskExecutionContext
* @return status
*/
boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,17 @@ public void cacheTaskExecutionContext(TaskExecutionContext taskExecutionContext)

/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
@Override
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskExecutionContextCache.remove(taskInstanceId);
}

@Override
public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext);
return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,28 +65,45 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final ExecutorService workerExecService;

/**
* worker config
* worker config
*/
private final WorkerConfig workerConfig;

/**
* task callback service
* task callback service
*/
private final TaskCallbackService taskCallbackService;

public TaskExecuteProcessor(){
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;

public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}

/**
* Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
*
* @param taskExecutionContext task
*/
private void setTaskCache(TaskExecutionContext taskExecutionContext) {
TaskExecutionContext preTaskCache = new TaskExecutionContext();
preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
}

@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));

TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
command.getBody(), TaskExecuteRequestCommand.class);

logger.info("received command : {}", taskRequestCommand);

Expand All @@ -99,6 +119,7 @@ public void process(Channel channel, Command command) {
logger.error("task execution context is null");
return;
}
setTaskCache(taskExecutionContext);
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
Expand All @@ -120,6 +141,7 @@ public void process(Channel channel, Command command) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
FileUtils.taskLoggerThreadLocal.remove();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ public void process(Channel channel, Command command) {
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
List<String> appIds = Collections.EMPTY_LIST;
List<String> appIds = Collections.emptyList();
try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);

Integer processId = taskExecutionContext.getProcessId();

if (processId == null || processId.equals(0)){
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
return Pair.of(false, appIds);
if (processId.equals(0)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds);
}

String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.regex.Pattern;

import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;

/**
Expand Down Expand Up @@ -160,12 +161,18 @@ private void buildProcess(String commandFile) throws IOException {
* @return CommandExecuteResult
* @throws Exception if error throws Exception
*/
public CommandExecuteResult run(String execCommand) throws Exception{
public CommandExecuteResult run(String execCommand) throws Exception {

CommandExecuteResult result = new CommandExecuteResult();


int taskInstanceId = taskExecutionContext.getTaskInstanceId();
// If the task has been killed, then the task in the cache is null
if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}

Expand All @@ -187,7 +194,12 @@ public CommandExecuteResult run(String execCommand) throws Exception{

// cache processId
taskExecutionContext.setProcessId(processId);
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}

// print process id
logger.info("process start, process id is: {}", processId);
Expand Down