diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 23b36f11a4b89c..169af97670f6e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -498,8 +498,18 @@ public long getMinTaskBeId(String clusterName) throws LoadException { // check if the specified BE is available for running task // return true if it is available. return false if otherwise. // throw exception if unrecoverable errors happen. - public long getAvailableBeForTask(long jobId, long previousBeId) throws LoadException { + public long getAvailableBeForTask(long jobId, long previousBeId) throws UserException { List availableBeIds = getAvailableBackendIds(jobId); + if (availableBeIds.isEmpty()) { + RoutineLoadJob job = getJob(jobId); + if (job != null) { + String msg = "no available BE found for job " + jobId + + "please check the BE status and user's cluster or tags"; + job.updateState(RoutineLoadJob.JobState.PAUSED, + new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, msg), false /* not replay */); + } + return -1L; + } // check if be has idle slot readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d40a6705626c84..040ca103004e34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -147,7 +147,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling - needScheduleTasksQueue.addFirst(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } } catch (UserException e) { @@ -311,7 +311,7 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException // 2. If not, try to find a better one with most idle slots. // return true if allocate successfully. return false if failed. // throw exception if unrecoverable errors happen. - private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException { + private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws UserException { long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(), routineLoadTaskInfo.getPreviousBeId()); if (beId == -1L) {