From 4e68b79ef625979dd2106dd833b8b74a1f462935 Mon Sep 17 00:00:00 2001
From: hui lai <laihui@selectdb.com>
Date: Tue, 22 Apr 2025 12:08:53 +0800
Subject: [PATCH] [fix](load) fix routine load job progress fallback after FE
 master node restart (#50221)

### What problem does this PR solve?

If user creates routine load job by specifying partitions:
```
FROM KAFKA
(
       "kafka_partitions" = "0",
       "kafka_offsets" = "XXX"
 );
```
job progress fallback after FE master node restart in share-storage
mode.

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [x] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [x] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
 .../load/routineload/KafkaRoutineLoadJob.java |   6 +
 .../data/test_routine_load_progress.csv       |   4 +
 .../test_routine_load_progress.groovy         | 130 ++++++++++++++++++
 3 files changed, 140 insertions(+)
 create mode 100644 regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
 create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bbf34b9325895f..896f1a1bcd9a88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -423,6 +423,12 @@ protected boolean unprotectNeedReschedule() throws UserException {
 
     private boolean isKafkaPartitionsChanged() throws UserException {
         if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
+            // for the case where the currentKafkaPartitions has not been assigned,
+            // we assume that the fe master has restarted or the job has been newly created,
+            // in this case, we need to pull the saved progress from meta service once
+            if (Config.isCloudMode() && (currentKafkaPartitions == null || currentKafkaPartitions.isEmpty())) {
+                updateCloudProgress();
+            }
             currentKafkaPartitions = customKafkaPartitions;
             return false;
         }
diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
new file mode 100644
index 00000000000000..b641f1eb6a2106
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_progress.csv
@@ -0,0 +1,4 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
new file mode 100644
index 00000000000000..c372c5826b2464
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_progress.groovy
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_progress","docker") {
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    docker(options) {
+        def kafkaCsvTpoics = [
+                  "test_routine_load_progress",
+                ]
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        def kafka_broker = "${externalEnvIp}:${kafka_port}"
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            // 1. send data to kafka
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+            def producer = new KafkaProducer<>(props)
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                def lines = txt.readLines()
+                lines.each { line ->
+                    logger.info("=====${line}========")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                    producer.send(record)
+                }
+            }
+
+            // 2. create table and routine load job
+            def tableName = "test_routine_load_progress"
+            def job = "test_progress"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k1` int(20) NULL,
+                    `k2` string NULL,
+                    `v1` date  NULL,
+                    `v2` string  NULL,
+                    `v3` datetime  NULL,
+                    `v4` string  NULL
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k1`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+                PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+            """
+
+            try {
+                sql """
+                    CREATE ROUTINE LOAD ${job} ON ${tableName}
+                    COLUMNS TERMINATED BY ","
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                        "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                        "kafka_partitions" = "0",
+                        "kafka_offsets" = "2"
+                    );
+                """
+                def count = 0
+                def beforeRes = 0
+                def afterRes = 0
+                while (true) {
+                    beforeRes = sql "select count(*) from ${tableName}"
+                    log.info("beforeRes: ${beforeRes}")
+                    def state = sql "show routine load for ${job}"
+                    log.info("routine load state: ${state[0][8].toString()}".toString())
+                    log.info("routine load statistic: ${state[0][14].toString()}".toString())
+                    log.info("reason of state changed: ${state[0][17].toString()}".toString())
+                    def lagJson = parseJson(state[0][16].toString())
+                    log.info("lag raw json: ${state[0][16].toString()}")
+                    if (beforeRes[0][0] > 0 && lagJson["0"] == 0) {
+                        break
+                    }
+                    if (count >= 30) {
+                        log.error("routine load can not visible for long time")
+                        assertEquals(1, 2)
+                        break
+                    }
+                    sleep(1000)
+                    count++
+                }
+
+                // 3. restart fe master
+                def masterFeIndex = cluster.getMasterFe().index
+                cluster.restartFrontends(masterFeIndex)
+                sleep(30 * 1000)
+                context.reconnectFe()
+
+                // 4. check count of table
+                def state = sql "show routine load for ${job}"
+                log.info("routine load statistic: ${state[0][14].toString()}".toString())
+                log.info("progress: ${state[0][15].toString()}")
+                log.info("lag: ${state[0][16].toString()}")
+                afterRes = sql "select count(*) from ${tableName}"
+                log.info("afterRes: ${afterRes}")
+                if (beforeRes[0][0] != afterRes[0][0]) {
+                    assertEquals(1, 2)
+                }
+            } finally {
+                sql "stop routine load for ${job}"
+            }
+        }
+    }
+}
\ No newline at end of file