From cfefafd4ef744b3e3102d27672ca9103d927b744 Mon Sep 17 00:00:00 2001 From: hui lai Date: Fri, 4 Jul 2025 11:35:51 +0800 Subject: [PATCH] [fix](job) fix routine load task scheduler block for one job can not find any BE (#52654) ### What problem does this PR solve? routine load task will block in following case: 1. The user created a job using the admin user of clusterA, and at some point deleted clusterA, and renamed clusterB to clusterA 2. The cluster ID saved in the job is invalid and can't find any BE 3. This task was repeatedly taken out of the queue and was put back to queue for there was no BE to execute, causing the other tasks to get stuck. --- .../doris/load/routineload/RoutineLoadManager.java | 12 +++++++++++- .../load/routineload/RoutineLoadTaskScheduler.java | 4 ++-- .../load/routineload/RoutineLoadManagerTest.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 23b36f11a4b89c..169af97670f6e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -498,8 +498,18 @@ public long getMinTaskBeId(String clusterName) throws LoadException { // check if the specified BE is available for running task // return true if it is available. return false if otherwise. // throw exception if unrecoverable errors happen. - public long getAvailableBeForTask(long jobId, long previousBeId) throws LoadException { + public long getAvailableBeForTask(long jobId, long previousBeId) throws UserException { List availableBeIds = getAvailableBackendIds(jobId); + if (availableBeIds.isEmpty()) { + RoutineLoadJob job = getJob(jobId); + if (job != null) { + String msg = "no available BE found for job " + jobId + + "please check the BE status and user's cluster or tags"; + job.updateState(RoutineLoadJob.JobState.PAUSED, + new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, msg), false /* not replay */); + } + return -1L; + } // check if be has idle slot readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d40a6705626c84..040ca103004e34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -147,7 +147,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling - needScheduleTasksQueue.addFirst(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } } catch (UserException e) { @@ -311,7 +311,7 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException // 2. If not, try to find a better one with most idle slots. // return true if allocate successfully. return false if failed. // throw exception if unrecoverable errors happen. - private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException { + private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws UserException { long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(), routineLoadTaskInfo.getPreviousBeId()); if (beId == -1L) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index d706c52557ee41..4886ed0690e6ed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -779,7 +779,7 @@ public void testCheckBeToTask(@Mocked Env env, routineLoadManager.addRoutineLoadJob(job, "testdb", "testtable"); Config.max_routine_load_task_num_per_be = 10; Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks); - Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L)); + Assert.assertEquals(-1L, routineLoadManager.getAvailableBeForTask(1L, 1L)); } @Test