Skip to content

Commit

Permalink
[fix](routine load) reset Kafka progress cache when routine load job …
Browse files Browse the repository at this point in the history
…topic change (apache#38474)

When change routine load job topic from test_topic_before to
test_topic_after by
```
ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = "test_topic_after");
```
(test_topic_before has 5 rows and test_topic_after has 1 rows)

Exception happened, which cannot consume any data:
```
2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
```

It is necessary to reset Kafka progress cache when routine load job
topic change.
  • Loading branch information
sollhui committed Aug 10, 2024
1 parent 80b6345 commit 4dbff58
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,17 @@ private void getReadableProgress(Map<Integer, String> showPartitionIdToOffset) {
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
}
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,22 +692,32 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
}

// modify partition offset first
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// convertCustomProperties and check partitions before reset progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();

if (!kafkaPartitionOffsets.isEmpty()) {
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
}

// It is necessary to reset the Kafka progress cache if topic change,
// and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
this.progress = new KafkaProgress();
}

// modify partition offset
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// modify broker list
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql_topic_change --
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"

-- !sql_topic_change1 --
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig

suite("test_routine_load_topic_change","p0") {
// send data to Kafka
def kafkaCsvTpoics = [
"test_topic_before",
"test_topic_after",
]
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// define kafka
def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Create kafka producer
def producer = new KafkaProducer<>(props)

for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record)
}
}
}

// test create routine load job with enclose and escape
def tableName = "test_routine_load_topic_change"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(20) NULL,
`k2` string NULL,
`v1` date NULL,
`v2` string NULL,
`v3` datetime NULL,
`v4` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

if (enabled != null && enabled.equalsIgnoreCase("true")) {
def jobName = "test_topic_change"
try {
sql """
CREATE ROUTINE LOAD ${jobName} on ${tableName}
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTpoics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(1000)
count++
}
qt_sql_topic_change "select * from ${tableName} order by k1"

sql "pause routine load for ${jobName}"
def res = sql "show routine load for ${jobName}"
log.info("routine load job properties: ${res[0][11].toString()}".toString())
sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" = \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = \"OFFSET_BEGINNING\");"
sql "resume routine load for ${jobName}"
count = 0
while (true) {
res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 5) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(1000)
count++
}
qt_sql_topic_change1 "select * from ${tableName} order by k1"
} finally {
sql "stop routine load for ${jobName}"
}
}
}

0 comments on commit 4dbff58

Please sign in to comment.