diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index c3a8bcc450cad7..98ea8b211d8a07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1973,6 +1973,14 @@ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordina response = MetaServiceProxy .getInstance().abortTxnWithCoordinator(request); LOG.info("AbortTxnWithCoordinatorResponse: {}", response); + if (DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) { + LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow enabled, sleep 15s"); + try { + Thread.sleep(15 * 1000); + } catch (InterruptedException ie) { + LOG.info("error ", ie); + } + } } catch (RpcException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 8b0e351f3063f9..077db66712a611 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -78,6 +78,7 @@ public class HeartbeatMgr extends MasterDaemon { private final ExecutorService executor; private SystemInfoService nodeMgr; private HeartbeatFlags heartbeatFlags; + private final ExecutorService abortTxnExecutor; private static volatile AtomicReference masterInfo = new AtomicReference<>(); @@ -86,6 +87,8 @@ public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); + this.abortTxnExecutor = ThreadPoolManager.newDaemonFixedThreadPool(1, + Config.heartbeat_mgr_blocking_queue_size, "abort-txn-executor", needRegisterMetric); this.heartbeatFlags = new HeartbeatFlags(); } @@ -192,18 +195,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { boolean isChanged = be.handleHbResponse(hbResponse, isReplay); if (hbResponse.getStatus() == HbStatus.OK) { long newStartTime = be.getLastStartTime(); + // oldStartTime > 0 means it is not the first heartbeat if (!isReplay && Config.enable_abort_txn_by_checking_coordinator_be - && oldStartTime != newStartTime) { - Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( - be.getId(), be.getHost(), newStartTime); + && oldStartTime != newStartTime && oldStartTime > 0) { + submitAbortTxnTaskByExecutor(() -> Env.getCurrentGlobalTransactionMgr() + .abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime), + "restart"); } } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() - >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { - Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( - be.getId(), be.getHost(), 100); + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L + && be.getLastUpdateMs() > 0) { + submitAbortTxnTaskByExecutor(() -> Env.getCurrentGlobalTransactionMgr() + .abortTxnWhenCoordinateBeDown(be.getId(), be.getHost(), 100), "down"); } } return isChanged; @@ -230,6 +236,26 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) { return false; } + private void submitAbortTxnTaskByExecutor(Runnable task, String reason) { + long start = System.currentTimeMillis(); + try { + abortTxnExecutor.submit(() -> { + LOG.info("start abort txn task, reason={}, start_ts={}", reason, start); + try { + task.run(); + long duration = System.currentTimeMillis() - start; + LOG.info("finish abort txn task, reason={}, start_ts={}, cost_ms={}", reason, start, duration); + } catch (Exception e) { + long duration = System.currentTimeMillis() - start; + LOG.warn("abort txn task({}) failed, start_ts={}, cost_ms={}", reason, start, duration, e); + } + }); + } catch (Exception e) { + long duration = System.currentTimeMillis() - start; + LOG.warn("failed to submit abort txn task({}), start_ts={}, cost_ms={}", reason, start, duration, e); + } + } + // backend heartbeat private class BackendHeartbeatHandler implements Callable { private Backend backend; diff --git a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy new file mode 100644 index 00000000000000..bd1e5b9d0b5df3 --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonSlurper +import groovy.json.JsonOutput +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_cloud_add_backend_heartbeat", 'p0, docker') { + if (!isCloudMode()) { + return + } + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.cloudMode = true + + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, ms.httpPort, msHttpPort) + + GetDebugPoint().enableDebugPointForAllFEs("FE.abortTxnWhenCoordinateBeRestart.slow") + + cluster.addBackend(10, "new_cluster") + + sql """admin set frontend config("cloud_tablet_rebalancer_interval_second"="3");""" + + cluster.restartBackends(); + + } + +} \ No newline at end of file