From efe2ef3a2f9d1ddda870721ee103d34fdc71546a Mon Sep 17 00:00:00 2001 From: Tiewei Fang Date: Wed, 19 Feb 2025 12:23:25 +0800 Subject: [PATCH] [fix](Export) Fix the issue where the show export status stays stuck on EXPORTING. (#47974) Problem Summary: Each Export task will be deleted after execution in the finally block of the TaskHandler#onTransientTaskHandle() method. Therefore, when removing a task in the TransientTaskManager#cancelMemoryTask method, there should be a check to see if the task exists. Otherwise, a null pointer exception may occur, causing the Export Job status update to fail. --- .../main/java/org/apache/doris/load/ExportJob.java | 5 ++++- .../doris/scheduler/disruptor/TaskHandler.java | 12 ++++++------ .../scheduler/manager/TransientTaskManager.java | 7 +++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 554185a2ed677d..d53c4ce13e5c8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -713,7 +713,10 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms LOG.info("cancel export job {}", id); } - private void exportExportJob() { + private void exportExportJob() throws JobException { + if (getState() == ExportJobState.CANCELLED || getState() == ExportJobState.FINISHED) { + throw new JobException("export job has been {}, can not be update to `EXPORTING` state", getState()); + } // The first exportTaskExecutor will set state to EXPORTING, // other exportTaskExecutors do not need to set up state. if (getState() == ExportJobState.EXPORTING) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 193f8ece9f7a2c..3be6102a714cd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -23,7 +23,8 @@ import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.WorkHandler; -import lombok.extern.log4j.Log4j2; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * This class represents a work handler for processing event tasks consumed by a Disruptor. @@ -32,9 +33,8 @@ * If the event job execution fails, the work handler logs an error message and pauses the event job. * The work handler also handles system events by scheduling batch scheduler tasks. */ -@Log4j2 public class TaskHandler implements WorkHandler { - + private static final Logger LOG = LogManager.getLogger(TaskHandler.class); /** * Processes an event task by retrieving the associated event job and executing it if it is running. @@ -50,7 +50,7 @@ public void onEvent(TaskEvent event) { onTransientTaskHandle(event); break; default: - log.warn("unknown task type: {}", event.getTaskType()); + LOG.warn("unknown task type: {}", event.getTaskType()); break; } } @@ -60,14 +60,14 @@ public void onTransientTaskHandle(TaskEvent taskEvent) { TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager(); TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId); if (taskExecutor == null) { - log.info("Memory task executor is null, task id: {}", taskId); + LOG.info("Memory task executor is null, task id: {}", taskId); return; } try { taskExecutor.execute(); } catch (JobException e) { - log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage()); + LOG.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage()); } finally { transientTaskManager.removeMemoryTask(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java index de501d3e0c2ffd..5c62caede9c171 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java @@ -63,10 +63,9 @@ public Long addMemoryTask(TransientTaskExecutor executor) throws JobException { } public void cancelMemoryTask(Long taskId) throws JobException { - try { - taskExecutorMap.get(taskId).cancel(); - } finally { - removeMemoryTask(taskId); + TransientTaskExecutor transientTaskExecutor = taskExecutorMap.get(taskId); + if (transientTaskExecutor != null) { + transientTaskExecutor.cancel(); } }