Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,10 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms
LOG.info("cancel export job {}", id);
}

private void exportExportJob() {
private void exportExportJob() throws JobException {
if (getState() == ExportJobState.CANCELLED || getState() == ExportJobState.FINISHED) {
throw new JobException("export job has been {}, can not be update to `EXPORTING` state", getState());
}
// The first exportTaskExecutor will set state to EXPORTING,
// other exportTaskExecutors do not need to set up state.
if (getState() == ExportJobState.EXPORTING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.doris.scheduler.manager.TransientTaskManager;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This class represents a work handler for processing event tasks consumed by a Disruptor.
Expand All @@ -32,9 +33,8 @@
* If the event job execution fails, the work handler logs an error message and pauses the event job.
* The work handler also handles system events by scheduling batch scheduler tasks.
*/
@Log4j2
public class TaskHandler implements WorkHandler<TaskEvent> {

private static final Logger LOG = LogManager.getLogger(TaskHandler.class);

/**
* Processes an event task by retrieving the associated event job and executing it if it is running.
Expand All @@ -50,7 +50,7 @@ public void onEvent(TaskEvent event) {
onTransientTaskHandle(event);
break;
default:
log.warn("unknown task type: {}", event.getTaskType());
LOG.warn("unknown task type: {}", event.getTaskType());
break;
}
}
Expand All @@ -60,14 +60,14 @@ public void onTransientTaskHandle(TaskEvent taskEvent) {
TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager();
TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId);
if (taskExecutor == null) {
log.info("Memory task executor is null, task id: {}", taskId);
LOG.info("Memory task executor is null, task id: {}", taskId);
return;
}

try {
taskExecutor.execute();
} catch (JobException e) {
log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
LOG.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
} finally {
transientTaskManager.removeMemoryTask(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ public Long addMemoryTask(TransientTaskExecutor executor) throws JobException {
}

public void cancelMemoryTask(Long taskId) throws JobException {
try {
taskExecutorMap.get(taskId).cancel();
} finally {
removeMemoryTask(taskId);
TransientTaskExecutor transientTaskExecutor = taskExecutorMap.get(taskId);
if (transientTaskExecutor != null) {
transientTaskExecutor.cancel();
}
}

Expand Down
Loading