From 4dbff58c472efd57fef521dc5981a83a14eb4360 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Tue, 6 Aug 2024 16:46:35 +0800 Subject: [PATCH] [fix](routine load) reset Kafka progress cache when routine load job topic change (#38474) When change routine load job topic from test_topic_before to test_topic_after by ``` ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = "test_topic_after"); ``` (test_topic_before has 5 rows and test_topic_after has 1 rows) Exception happened, which cannot consume any data: ``` 2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 ``` It is necessary to reset Kafka progress cache when routine load job topic change. --- .../doris/load/routineload/KafkaProgress.java | 8 +- .../load/routineload/KafkaRoutineLoadJob.java | 28 ++-- .../test_routine_load_topic_change.out | 16 ++ .../routine_load/data/test_topic_after.csv | 1 + .../routine_load/data/test_topic_before.csv | 5 + .../test_routine_load_topic_change.groovy | 140 ++++++++++++++++++ 6 files changed, 186 insertions(+), 12 deletions(-) create mode 100644 regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out create mode 100644 regression-test/suites/load_p0/routine_load/data/test_topic_after.csv create mode 100644 regression-test/suites/load_p0/routine_load/data/test_topic_before.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 53c57a1cceb193..7dd49ba1ec71d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -118,15 +118,17 @@ private void getReadableProgress(Map showPartitionIdToOffset) { } } - // modify the partition offset of this progress. - // throw exception is the specified partition does not exist in progress. - public void modifyOffset(List> kafkaPartitionOffsets) throws DdlException { + public void checkPartitions(List> kafkaPartitionOffsets) throws DdlException { for (Pair pair : kafkaPartitionOffsets) { if (!partitionIdToOffset.containsKey(pair.first)) { throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions"); } } + } + // modify the partition offset of this progress. + // throw exception is the specified partition does not exist in progress. + public void modifyOffset(List> kafkaPartitionOffsets) { for (Pair pair : kafkaPartitionOffsets) { partitionIdToOffset.put(pair.first, pair.second); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 201412027abc86..22de3ef3574608 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -692,22 +692,32 @@ private void modifyPropertiesInternal(Map jobProperties, customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); } - // modify partition offset first - if (!kafkaPartitionOffsets.isEmpty()) { - // we can only modify the partition that is being consumed - ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); - } - + // convertCustomProperties and check partitions before reset progress to make modify operation atomic if (!customKafkaProperties.isEmpty()) { this.customProperties.putAll(customKafkaProperties); convertCustomProperties(true); } - // modify broker list and topic - if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { - this.brokerList = dataSourceProperties.getBrokerList(); + + if (!kafkaPartitionOffsets.isEmpty()) { + ((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets); } + + // It is necessary to reset the Kafka progress cache if topic change, + // and should reset cache before modifying partition offset. if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { this.topic = dataSourceProperties.getTopic(); + this.progress = new KafkaProgress(); + } + + // modify partition offset + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); + } + + // modify broker list + if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { + this.brokerList = dataSourceProperties.getBrokerList(); } } if (!jobProperties.isEmpty()) { diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out new file mode 100644 index 00000000000000..1f534d0a0823ec --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_topic_change -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + +-- !sql_topic_change1 -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + diff --git a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv new file mode 100644 index 00000000000000..de1727d2d81c9b --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv @@ -0,0 +1 @@ +6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv new file mode 100644 index 00000000000000..f1a48b1e411249 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv @@ -0,0 +1,5 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" +5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy new file mode 100644 index 00000000000000..25bf9933d112ff --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_topic_change","p0") { + // send data to Kafka + def kafkaCsvTpoics = [ + "test_topic_before", + "test_topic_after", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + // test create routine load job with enclose and escape + def tableName = "test_routine_load_topic_change" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "test_topic_change" + try { + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_topic_change "select * from ${tableName} order by k1" + + sql "pause routine load for ${jobName}" + def res = sql "show routine load for ${jobName}" + log.info("routine load job properties: ${res[0][11].toString()}".toString()) + sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" = \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = \"OFFSET_BEGINNING\");" + sql "resume routine load for ${jobName}" + count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 5) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(1000) + count++ + } + qt_sql_topic_change1 "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + } + } +} \ No newline at end of file