diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index d0843eb92044f0..26d271fcc146ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -367,6 +367,11 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad @Override protected void unprotectUpdateProgress() throws UserException { + // For cloud mode, should update cloud progress from meta service, + // then update progress with default offset from Kafka if necessary. + if (Config.isCloudMode()) { + updateCloudProgress(); + } updateNewPartitionProgress(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 51029c3d18b194..023cd239e0995b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -18,7 +18,6 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -79,10 +78,6 @@ private void process() throws UserException { RoutineLoadJob.JobState errorJobState = null; UserException userException = null; try { - if (Config.isCloudMode()) { - routineLoadJob.updateCloudProgress(); - } - routineLoadJob.prepare(); // judge nums of tasks more than max concurrent tasks of cluster int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();