Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
int timeout) {
DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
// sleep 60s
std::this_thread::sleep_for(std::chrono::seconds(60));
});
MonotonicStopWatch watch;
watch.start();
for (int32_t partition_id : partition_ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_get_kafka_meta_timeout_second = 60;


/**
* the expire time of routine load blacklist.
*/
@ConfField(mutable = true, masterOnly = true)
public static int routine_load_blacklist_expire_time_second = 300;

/**
* The max number of files store in SmallFileMgr
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -227,12 +229,29 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
Set<Long> failedBeIds = new HashSet<>();
TStatusCode code = null;

try {
while (retryTimes < 3) {
List<Long> backendIds = new ArrayList<>();
for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend != null && backend.isLoadAvailable() && !backend.isDecommissioned()) {
if (backend != null && backend.isLoadAvailable()
&& !backend.isDecommissioned()
&& !failedBeIds.contains(beId)
&& !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
backendIds.add(beId);
}
}
// If there are no available backends, utilize the blacklist.
// Special scenarios include:
// 1. A specific job that connects to Kafka may time out for topic config or network error,
// leaving only one backend operational.
// 2. If that sole backend is decommissioned, the aliveBackends list becomes empty.
// Hence, in such cases, it's essential to rely on the blacklist to obtain meta information.
if (backendIds.isEmpty()) {
for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
backendIds.add(beId);
}
}
Expand All @@ -243,19 +262,22 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
long beId = be.getId();

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
failedBeIds.add(beId);
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
failedBeIds.add(beId);
retryTimes++;
} else {
return result;
Expand All @@ -265,6 +287,20 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info");
} finally {
// Ensure that not all BE added to the blacklist.
// For single request:
// Only when the final success is achieved, the failed BE will be added to the blacklist,
// ensuring that there are always BE nodes that are not on the blacklist.
// For multiple requests:
// If there is only one BE left without being blacklisted after multiple jitters,
// even if this BE fails, it will not be blacklisted.
if (code != null && code == TStatusCode.OK && !failedBeIds.isEmpty()) {
for (Long beId : failedBeIds) {
Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId);
LOG.info("add beId {} to blacklist, blacklist: {}", beId,
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist());
}
}
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable {

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

// Map<beId, timestamp when added to blacklist>
private Map<Long, Long> blacklist = new ConcurrentHashMap<>();

private void readLock() {
lock.readLock().lock();
}
Expand All @@ -110,6 +113,10 @@ private void writeUnlock() {
public RoutineLoadManager() {
}

public Map<Long, Long> getBlacklist() {
return blacklist;
}

public List<RoutineLoadJob> getAllRoutineLoadJobs() {
return new ArrayList<>(idToRoutineLoadJob.values());
}
Expand Down Expand Up @@ -940,4 +947,22 @@ public void readFields(DataInput in) throws IOException {
}
}
}

public void addToBlacklist(long beId) {
blacklist.put(beId, System.currentTimeMillis());
}

public boolean isInBlacklist(long beId) {
Long timestamp = blacklist.get(beId);
if (timestamp == null) {
return false;
}

if (System.currentTimeMillis() - timestamp > Config.routine_load_blacklist_expire_time_second * 1000) {
blacklist.remove(beId);
LOG.info("remove beId {} from blacklist, blacklist: {}", beId, blacklist);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
124 changes: 124 additions & 0 deletions regression-test/suites/load_p0/routine_load/test_black_list.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.ProducerConfig

suite("test_black_list","nonConcurrent,p0") {
String enabled = context.config.otherConfigs.get("enableKafkaTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// 1. send data
def kafkaCsvTpoics = [
"test_black_list",
]
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"
def props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
def producer = new KafkaProducer<>(props)
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()
lines.each { line ->
logger.info("=====${line}========")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record)
}
}

// 2. create table and routine load job
def tableName = "test_black_list"
def job = "test_black_list_job"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(20) NULL,
`k2` string NULL,
`v1` date NULL,
`v2` string NULL,
`v3` datetime NULL,
`v4` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

def inject = "KafkaDataConsumer.get_latest_offsets_for_partitions.timeout"
try {
GetDebugPoint().enableDebugPointForAllBEs(inject)
sql """
CREATE ROUTINE LOAD ${job} ON ${tableName}
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${kafkaCsvTpoics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""

def count = 0
while (true) {
def res = sql "select count(*) from ${tableName}"
log.info("res: ${res}")
def state = sql "show routine load for ${job}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
log.info("other msg: ${state[0][19].toString()}".toString())
if (state[0][17].toString().contains("Failed to get info") || state[0][19].toString().contains("Failed to get info")) {
break
}
if (count >= 90) {
log.error("routine load test fail")
assertEquals(1, 2)
break
}
sleep(1000)
count++
}

count = 0
GetDebugPoint().disableDebugPointForAllBEs(inject)
while (true) {
sleep(1000)
def res = sql "show routine load for ${job}"
log.info("routine load statistic: ${res[0][14].toString()}".toString())
log.info("progress: ${res[0][15].toString()}".toString())
log.info("lag: ${res[0][16].toString()}".toString())
res = sql "select count(*) from ${tableName}"
if (res[0][0] > 0) {
break;
}
count++
if (count > 60) {
assertEquals(1, 2)
}
continue;
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs(inject)
sql "stop routine load for ${job}"
}
}
}
Loading