Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ private void handleSubmitTaskFailure(RoutineLoadTaskInfo routineLoadTaskInfo, St

// Check if this is a resource pressure error that should not be immediately rescheduled
if (errorMsg.contains("TOO_MANY_TASKS") || errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
// submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error),
// but txn has already begun. Here we will still set the ExecuteStartTime of
// this task, which means we "assume" that this task has been successfully submitted.
// And this task will then be aborted because of a timeout.
// In this way, we can prevent the entire job from being paused due to submit errors,
// and we can also relieve the pressure on BE by waiting for the timeout period.
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
return;
}

Expand Down Expand Up @@ -302,6 +309,12 @@ private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException
throw new LoadException("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED");
}

if (DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")) {
LOG.warn("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED,"
+ "routine load task submit failed");
throw new LoadException("MEM_LIMIT_EXCEEDED");
}

if (tStatus.getStatusCode() != TStatusCode.OK) {
throw new LoadException("failed to submit task. error code: " + tStatus.getStatusCode()
+ ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? tStatus.getErrorMsgs().get(0) : "NaN"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,60 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
logger.warn("Failed to stop routine load job: ${e.message}")
}
}

sql "truncate table ${tableName}"
def memJob = "test_routine_load_job_schedule_mem_limit"
try {
GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
testData.each { line->
logger.info("Sending data to kafka: ${line}")
def record = new ProducerRecord<>(kafkaCsvTpoics[0], null, line)
producer.send(record)
}

sql """
CREATE ROUTINE LOAD ${memJob} ON ${tableName}
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${kafka_broker}",
"kafka_topic" = "${newTopic.name()}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"max_batch_interval" = "6"
);
"""

sleep(5000)

GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")

def count = 0
def maxWaitCount = 120 // > 60 = maxBatchIntervalS * Config.routine_load_task_timeout_multiplier
while (true) {
def state = sql "show routine load for ${memJob}"
def routineLoadState = state[0][8].toString()
def statistic = state[0][14].toString()
logger.info("Routine load state: ${routineLoadState}")
logger.info("Routine load statistic: ${statistic}")
def rowCount = sql "select count(*) from ${memTableName}"
if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
break
}
if (count > maxWaitCount) {
assertEquals(1, 2)
}
sleep(1000)
count++
}
} catch (Exception e) {
logger.error("MEM_LIMIT_EXCEEDED test failed with exception: ${e.message}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
try {
sql "stop routine load for test_routine_load_job_schedule_mem_limit"
} catch (Exception e) {
logger.warn("Failed to stop routine load job: ${e.message}")
}
}
}
}
}
Loading