From f725a4b43f6f95b20bc2b1b808be312e1e29682c Mon Sep 17 00:00:00 2001 From: laihui Date: Thu, 15 May 2025 15:09:18 +0800 Subject: [PATCH] introduce black list of backend when load job fetch meta to avoid jitter --- be/src/runtime/routine_load/data_consumer.cpp | 4 + .../java/org/apache/doris/common/Config.java | 7 + .../doris/datasource/kafka/KafkaUtil.java | 40 +++++- .../load/routineload/RoutineLoadManager.java | 25 ++++ .../routine_load/data/test_black_list.csv | 1 + .../routine_load/test_black_list.groovy | 124 ++++++++++++++++++ 6 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/load_p0/routine_load/data/test_black_list.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_black_list.groovy diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 7566d06914a712..00f9c726b41906 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -418,6 +418,10 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector& Status KafkaDataConsumer::get_latest_offsets_for_partitions( const std::vector& partition_ids, std::vector* offsets, int timeout) { + DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", { + // sleep 60s + std::this_thread::sleep_for(std::chrono::seconds(60)); + }); MonotonicStopWatch watch; watch.start(); for (int32_t partition_id : partition_ids) { diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b5d0b904092aa4..280e16ab363884 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1308,6 +1308,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_get_kafka_meta_timeout_second = 60; + + /** + * the expire time of routine load blacklist. + */ + @ConfField(mutable = true, masterOnly = true) + public static int routine_load_blacklist_expire_time_second = 300; + /** * The max number of files store in SmallFileMgr */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index da683cc2b371c6..3e78ba0d4a5265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -35,8 +35,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -227,12 +229,29 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx TNetworkAddress address = null; Future future = null; InternalService.PProxyResult result = null; + Set failedBeIds = new HashSet<>(); + TStatusCode code = null; + try { while (retryTimes < 3) { List backendIds = new ArrayList<>(); for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) { Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (backend != null && backend.isLoadAvailable() && !backend.isDecommissioned()) { + if (backend != null && backend.isLoadAvailable() + && !backend.isDecommissioned() + && !failedBeIds.contains(beId) + && !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) { + backendIds.add(beId); + } + } + // If there are no available backends, utilize the blacklist. + // Special scenarios include: + // 1. A specific job that connects to Kafka may time out for topic config or network error, + // leaving only one backend operational. + // 2. If that sole backend is decommissioned, the aliveBackends list becomes empty. + // Hence, in such cases, it's essential to rely on the blacklist to obtain meta information. + if (backendIds.isEmpty()) { + for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) { backendIds.add(beId); } } @@ -243,19 +262,22 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx Collections.shuffle(backendIds); Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + long beId = be.getId(); try { future = BackendServiceProxy.getInstance().getInfo(address, request); result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); + failedBeIds.add(beId); retryTimes++; continue; } - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { LOG.warn("failed to get info request to " + address + " err " + result.getStatus().getErrorMsgsList()); + failedBeIds.add(beId); retryTimes++; } else { return result; @@ -265,6 +287,20 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L); throw new LoadException("Failed to get info"); } finally { + // Ensure that not all BE added to the blacklist. + // For single request: + // Only when the final success is achieved, the failed BE will be added to the blacklist, + // ensuring that there are always BE nodes that are not on the blacklist. + // For multiple requests: + // If there is only one BE left without being blacklisted after multiple jitters, + // even if this BE fails, it will not be blacklisted. + if (code != null && code == TStatusCode.OK && !failedBeIds.isEmpty()) { + for (Long beId : failedBeIds) { + Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId); + LOG.info("add beId {} to blacklist, blacklist: {}", beId, + Env.getCurrentEnv().getRoutineLoadManager().getBlacklist()); + } + } long endTime = System.currentTimeMillis(); MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime); MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3bb25e856196e3..36e0584e12d348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable { private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + // Map + private Map blacklist = new ConcurrentHashMap<>(); + private void readLock() { lock.readLock().lock(); } @@ -110,6 +113,10 @@ private void writeUnlock() { public RoutineLoadManager() { } + public Map getBlacklist() { + return blacklist; + } + public List getAllRoutineLoadJobs() { return new ArrayList<>(idToRoutineLoadJob.values()); } @@ -940,4 +947,22 @@ public void readFields(DataInput in) throws IOException { } } } + + public void addToBlacklist(long beId) { + blacklist.put(beId, System.currentTimeMillis()); + } + + public boolean isInBlacklist(long beId) { + Long timestamp = blacklist.get(beId); + if (timestamp == null) { + return false; + } + + if (System.currentTimeMillis() - timestamp > Config.routine_load_blacklist_expire_time_second * 1000) { + blacklist.remove(beId); + LOG.info("remove beId {} from blacklist, blacklist: {}", beId, blacklist); + return false; + } + return true; + } } diff --git a/regression-test/suites/load_p0/routine_load/data/test_black_list.csv b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv new file mode 100644 index 00000000000000..b226b99ee4e0e0 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy b/regression-test/suites/load_p0/routine_load/test_black_list.groovy new file mode 100644 index 00000000000000..04779f10362a2c --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_black_list","nonConcurrent,p0") { + String enabled = context.config.otherConfigs.get("enableKafkaTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // 1. send data + def kafkaCsvTpoics = [ + "test_black_list", + ] + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + // 2. create table and routine load job + def tableName = "test_black_list" + def job = "test_black_list_job" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def inject = "KafkaDataConsumer.get_latest_offsets_for_partitions.timeout" + try { + GetDebugPoint().enableDebugPointForAllBEs(inject) + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + log.info("res: ${res}") + def state = sql "show routine load for ${job}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + log.info("other msg: ${state[0][19].toString()}".toString()) + if (state[0][17].toString().contains("Failed to get info") || state[0][19].toString().contains("Failed to get info")) { + break + } + if (count >= 90) { + log.error("routine load test fail") + assertEquals(1, 2) + break + } + sleep(1000) + count++ + } + + count = 0 + GetDebugPoint().disableDebugPointForAllBEs(inject) + while (true) { + sleep(1000) + def res = sql "show routine load for ${job}" + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + log.info("progress: ${res[0][15].toString()}".toString()) + log.info("lag: ${res[0][16].toString()}".toString()) + res = sql "select count(*) from ${tableName}" + if (res[0][0] > 0) { + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs(inject) + sql "stop routine load for ${job}" + } + } +} \ No newline at end of file