Skip to content

Commit efe2ef3

Browse files
BePPPowerYour Name
authored andcommitted
[fix](Export) Fix the issue where the show export status stays stuck on EXPORTING. (#47974)
Problem Summary: Each Export task will be deleted after execution in the finally block of the TaskHandler#onTransientTaskHandle() method. Therefore, when removing a task in the TransientTaskManager#cancelMemoryTask method, there should be a check to see if the task exists. Otherwise, a null pointer exception may occur, causing the Export Job status update to fail.
1 parent 1f858d9 commit efe2ef3

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,10 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms
713713
LOG.info("cancel export job {}", id);
714714
}
715715

716-
private void exportExportJob() {
716+
private void exportExportJob() throws JobException {
717+
if (getState() == ExportJobState.CANCELLED || getState() == ExportJobState.FINISHED) {
718+
throw new JobException("export job has been {}, can not be update to `EXPORTING` state", getState());
719+
}
717720
// The first exportTaskExecutor will set state to EXPORTING,
718721
// other exportTaskExecutors do not need to set up state.
719722
if (getState() == ExportJobState.EXPORTING) {

fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import org.apache.doris.scheduler.manager.TransientTaskManager;
2424

2525
import com.lmax.disruptor.WorkHandler;
26-
import lombok.extern.log4j.Log4j2;
26+
import org.apache.logging.log4j.LogManager;
27+
import org.apache.logging.log4j.Logger;
2728

2829
/**
2930
* This class represents a work handler for processing event tasks consumed by a Disruptor.
@@ -32,9 +33,8 @@
3233
* If the event job execution fails, the work handler logs an error message and pauses the event job.
3334
* The work handler also handles system events by scheduling batch scheduler tasks.
3435
*/
35-
@Log4j2
3636
public class TaskHandler implements WorkHandler<TaskEvent> {
37-
37+
private static final Logger LOG = LogManager.getLogger(TaskHandler.class);
3838

3939
/**
4040
* Processes an event task by retrieving the associated event job and executing it if it is running.
@@ -50,7 +50,7 @@ public void onEvent(TaskEvent event) {
5050
onTransientTaskHandle(event);
5151
break;
5252
default:
53-
log.warn("unknown task type: {}", event.getTaskType());
53+
LOG.warn("unknown task type: {}", event.getTaskType());
5454
break;
5555
}
5656
}
@@ -60,14 +60,14 @@ public void onTransientTaskHandle(TaskEvent taskEvent) {
6060
TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager();
6161
TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId);
6262
if (taskExecutor == null) {
63-
log.info("Memory task executor is null, task id: {}", taskId);
63+
LOG.info("Memory task executor is null, task id: {}", taskId);
6464
return;
6565
}
6666

6767
try {
6868
taskExecutor.execute();
6969
} catch (JobException e) {
70-
log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
70+
LOG.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
7171
} finally {
7272
transientTaskManager.removeMemoryTask(taskId);
7373
}

fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,9 @@ public Long addMemoryTask(TransientTaskExecutor executor) throws JobException {
6363
}
6464

6565
public void cancelMemoryTask(Long taskId) throws JobException {
66-
try {
67-
taskExecutorMap.get(taskId).cancel();
68-
} finally {
69-
removeMemoryTask(taskId);
66+
TransientTaskExecutor transientTaskExecutor = taskExecutorMap.get(taskId);
67+
if (transientTaskExecutor != null) {
68+
transientTaskExecutor.cancel();
7069
}
7170
}
7271

0 commit comments

Comments
 (0)