Skip to content

Commit

Permalink
[fix](routine load) fix show routine load task result incorrect (#38523)
Browse files Browse the repository at this point in the history
### Bug report:
Create a job:
```
CREATE ROUTINE LOAD testShow ON test_show_routine_load
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:19092",
"kafka_topic" = "test_show_routine_load",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
```
show routine load task:
```
SHOW ROUTINE LOAD TASK WHERE JobName = "testShow";
```
result:
```
ERROR 1105 (HY000): errCode = 2, detailMessage = The job named testshowdoes not exists or job state is stopped or cancelled
```

### Solution
Do not  use `toLowerCase` method;
  • Loading branch information
sollhui authored and dataroaring committed Aug 4, 2024
1 parent 4942677 commit 4c52883
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void checkJobNameExpr(Analyzer analyzer) throws AnalysisException {
break CHECK;
}
StringLiteral stringLiteral = (StringLiteral) binaryPredicate.getChild(1);
jobName = stringLiteral.getValue().toLowerCase();
jobName = stringLiteral.getValue();
} // CHECKSTYLE IGNORE THIS LINE

if (!valid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,136 @@
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig

suite("test_show_routine_load","p0") {
def kafkaCsvTpoics = [
"test_show_routine_load",
]
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// define kafka
def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Create kafka producer
def producer = new KafkaProducer<>(props)

try {
sql """
CREATE ROUTINE LOAD testShow
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "multi_table_load_invalid_table",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record)
}
}
}

if (enabled != null && enabled.equalsIgnoreCase("true")) {
def tableName = "test_show_routine_load"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE ROUTINE LOAD testShow1
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "multi_table_load_invalid_table",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(20) NULL,
`k2` string NULL,
`v1` date NULL,
`v2` string NULL,
`v3` datetime NULL,
`v4` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
sql "sync"

String db = context.config.getDbNameByFile(context.file)
log.info("reason of state changed: ${db}".toString())
try {
sql """
CREATE ROUTINE LOAD testShow ON ${tableName}
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTpoics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""

sql """
CREATE ROUTINE LOAD testShow1 ON ${tableName}
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTpoics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

String db = context.config.getDbNameByFile(context.file)
log.info("reason of state changed: ${db}".toString())

def res = sql "show routine load for ${db}.testShow"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 1)

def res = sql "show routine load for ${db}.testShow"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 1)
res = sql "show routine load for testShow"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 1)

res = sql "show routine load for testShow"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 1)
res = sql "show all routine load"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() > 1)

res = sql "show all routine load"
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() > 1)
res = sql "SHOW ROUTINE LOAD LIKE \"%testShow%\""
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 2)

res = sql "SHOW ROUTINE LOAD LIKE \"%testShow%\""
log.info("reason of state changed: ${res.size()}".toString())
assertTrue(res.size() == 2)
} finally {
sql "stop routine load for testShow"
sql "stop routine load for testShow1"
def count = 0
while (true) {
res = sql "select count(*) from ${tableName}"
def state = sql "show routine load for testShow1"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}
res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = \"testShow1\";"
log.info("SHOW ROUTINE LOAD task result: ${res}".toString())
assertTrue(res.size() == 1)
} finally {
sql "stop routine load for testShow"
sql "stop routine load for testShow1"
}
}
}

0 comments on commit 4c52883

Please sign in to comment.