From dade2e4517613fe7af7947ab3fcc7a6a0dfce9d7 Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Wed, 3 Sep 2025 12:16:47 +0800 Subject: [PATCH] [Bug](routine load) Fix routine load task failed with MEM_LIMIT_EXCEED never be scheduled again (#55481) A routine load task which submit failed with following msg will never be scheduled again, causing the corresponding kafka partition to no longer be consumed. ``` 2025-08-29 03:42:24:errCode = 2, detailMessage = failed to send task: errCode = 2, detailMessage = failed to submit task. error code: MEM_LIMIT_EXCEEDED, msg: (10.0.72.47)[MEM_LIMIT_EXCEEDED] is_exceed_soft_mem_limit: 0 current_load_mem_value: 114837740081 _load_mem_limit: 112722727424 ``` --- .../routineload/RoutineLoadTaskScheduler.java | 13 +++++ .../test_routine_load_job_schedule.groovy | 57 ++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 28e03567765ba8..d1b5a6f73e8b47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -247,6 +247,13 @@ private void handleSubmitTaskFailure(RoutineLoadTaskInfo routineLoadTaskInfo, St // Check if this is a resource pressure error that should not be immediately rescheduled if (errorMsg.contains("TOO_MANY_TASKS") || errorMsg.contains("MEM_LIMIT_EXCEEDED")) { + // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error), + // but txn has already begun. Here we will still set the ExecuteStartTime of + // this task, which means we "assume" that this task has been successfully submitted. + // And this task will then be aborted because of a timeout. + // In this way, we can prevent the entire job from being paused due to submit errors, + // and we can also relieve the pressure on BE by waiting for the timeout period. + routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis()); return; } @@ -302,6 +309,12 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException throw new LoadException("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED"); } + if (DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")) { + LOG.warn("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED," + + "routine load task submit failed"); + throw new LoadException("MEM_LIMIT_EXCEEDED"); + } + if (tStatus.getStatusCode() != TStatusCode.OK) { throw new LoadException("failed to submit task. error code: " + tStatus.getStatusCode() + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? tStatus.getErrorMsgs().get(0) : "NaN")); diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy index ed52e60a13ff53..5cd433283e0c01 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy @@ -111,5 +111,60 @@ suite("test_routine_load_job_schedule","nonConcurrent") { logger.warn("Failed to stop routine load job: ${e.message}") } } + + sql "truncate table ${tableName}" + def memJob = "test_routine_load_job_schedule_mem_limit" + try { + GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED") + testData.each { line-> + logger.info("Sending data to kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTpoics[0], null, line) + producer.send(record) + } + + sql """ + CREATE ROUTINE LOAD ${memJob} ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${newTopic.name()}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING", + "max_batch_interval" = "6" + ); + """ + + sleep(5000) + + GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED") + + def count = 0 + def maxWaitCount = 120 // > 60 = maxBatchIntervalS * Config.routine_load_task_timeout_multiplier + while (true) { + def state = sql "show routine load for ${memJob}" + def routineLoadState = state[0][8].toString() + def statistic = state[0][14].toString() + logger.info("Routine load state: ${routineLoadState}") + logger.info("Routine load statistic: ${statistic}") + def rowCount = sql "select count(*) from ${memTableName}" + if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) { + break + } + if (count > maxWaitCount) { + assertEquals(1, 2) + } + sleep(1000) + count++ + } + } catch (Exception e) { + logger.error("MEM_LIMIT_EXCEEDED test failed with exception: ${e.message}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED") + try { + sql "stop routine load for test_routine_load_job_schedule_mem_limit" + } catch (Exception e) { + logger.warn("Failed to stop routine load job: ${e.message}") + } + } } -} \ No newline at end of file +}