From 310a60f02210f32e9ed54e26c39f5bb2720690f8 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Fri, 9 Jan 2026 00:57:41 +0800 Subject: [PATCH 1/2] [enhancement](cloud) improve FE RPC retry and MetaService connection handling Add address-provider retry path in ThriftRpcHelper; stream load uses provider to retry on new FE master Only reconnect MetaServiceProxy on request failures; close connection on getVisibleVersionAsync failure Update meta_service_rpc_reconnect_interval_ms in FE/BE (5000 -> 100) --- be/src/cloud/config.cpp | 2 +- .../stream_load/stream_load_executor.cpp | 23 ++-- be/src/util/thrift_rpc_helper.cpp | 52 +++++-- be/src/util/thrift_rpc_helper.h | 11 ++ .../java/org/apache/doris/common/Config.java | 2 +- .../doris/cloud/rpc/MetaServiceProxy.java | 35 ++++- .../doris/cloud/rpc/MetaServiceProxyTest.java | 127 ++++++++++++++++++ 7 files changed, 227 insertions(+), 25 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index be7c36696dd0ac..b14b7e8667c024 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -110,7 +110,7 @@ DEFINE_mBool(enable_cloud_tablet_report, "true"); DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25"); -DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000"); +DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "100"); DEFINE_mInt32(meta_service_conflict_error_retry_times, "10"); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 5ac18954e1d091..de7a979cedcd45 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -176,15 +176,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { TLoadTxnBeginResult result; Status status; int64_t duration_ns = 0; - TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; + TNetworkAddress master_addr = master_addr_provider(); if (master_addr.hostname.empty() || master_addr.port == 0) { status = Status::Error("Have not get FE Master heartbeat yet"); } else { SCOPED_RAW_TIMER(&duration_ns); #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { + master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxnBegin(result, request); })); #else @@ -213,14 +213,14 @@ Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnCommitResult result; int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, + master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxnPreCommit(result, request); }, @@ -258,13 +258,13 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { request.__set_txnId(ctx->txn_id); } - TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxn2PCResult result; int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, + master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxn2PC(result, request); }, @@ -310,11 +310,11 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnCommitResult result; #ifndef BE_TEST RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, + master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxnCommit(result, request); }, @@ -342,7 +342,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1); - TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); request.__set_db(ctx->db); @@ -364,8 +364,7 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { TLoadTxnRollbackResult result; #ifndef BE_TEST auto rpc_st = ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { + master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxnRollback(result, request); }); if (!rpc_st.ok()) { diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index de5091cf7f1b66..bf26230c909711 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -63,7 +63,16 @@ void ThriftRpcHelper::setup(ExecEnv* exec_env) { template Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, std::function&)> callback, int timeout_ms) { - TNetworkAddress address = make_network_address(ip, port); + return rpc([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms); +} + +template +Status ThriftRpcHelper::rpc(std::function address_provider, + std::function&)> callback, int timeout_ms) { + TNetworkAddress address = address_provider(); + if (address.hostname.empty() || address.port == 0) { + return Status::Error("FE address is not available"); + } Status status; DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; }); ClientConnection client(_s_exec_env->get_client_cache(), address, timeout_ms, &status); @@ -85,15 +94,36 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, #endif std::this_thread::sleep_for( std::chrono::milliseconds(config::thrift_client_retry_interval_ms)); - status = client.reopen(timeout_ms); - if (!status.ok()) { + TNetworkAddress retry_address = address_provider(); + if (retry_address.hostname.empty() || retry_address.port == 0) { + return Status::Error("FE address is not available"); + } + if (retry_address.hostname != address.hostname || retry_address.port != address.port) { +#ifndef ADDRESS_SANITIZER + LOG(INFO) << "retrying call frontend service with new address=" << retry_address; +#endif + Status retry_status; + ClientConnection retry_client(_s_exec_env->get_client_cache(), retry_address, + timeout_ms, &retry_status); + if (!retry_status.ok()) { +#ifndef ADDRESS_SANITIZER + LOG(WARNING) << "Connect frontend failed, address=" << retry_address + << ", status=" << retry_status; +#endif + return retry_status; + } + callback(retry_client); + } else { + status = client.reopen(timeout_ms); + if (!status.ok()) { #ifndef ADDRESS_SANITIZER - LOG(WARNING) << "client reopen failed. address=" << address - << ", status=" << status; + LOG(WARNING) << "client reopen failed. address=" << address + << ", status=" << status; #endif - return status; + return status; + } + callback(client); } - callback(client); } } catch (apache::thrift::TException& e) { #ifndef ADDRESS_SANITIZER @@ -104,8 +134,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port, std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); // just reopen to disable this connection static_cast(client.reopen(timeout_ms)); - return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}", ip, - port, e.what()); + return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}", + address.hostname, address.port, e.what()); } return Status::OK(); } @@ -114,6 +144,10 @@ template Status ThriftRpcHelper::rpc( const std::string& ip, const int32_t port, std::function&)> callback, int timeout_ms); +template Status ThriftRpcHelper::rpc( + std::function address_provider, + std::function&)> callback, int timeout_ms); + template Status ThriftRpcHelper::rpc( const std::string& ip, const int32_t port, std::function&)> callback, int timeout_ms); diff --git a/be/src/util/thrift_rpc_helper.h b/be/src/util/thrift_rpc_helper.h index cf876990e6e7ca..3617967b815653 100644 --- a/be/src/util/thrift_rpc_helper.h +++ b/be/src/util/thrift_rpc_helper.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -43,10 +44,20 @@ class ThriftRpcHelper { return rpc(ip, port, callback, config::thrift_rpc_timeout_ms); } + template + static Status rpc(std::function address_provider, + std::function&)> callback) { + return rpc(address_provider, callback, config::thrift_rpc_timeout_ms); + } + template static Status rpc(const std::string& ip, const int32_t port, std::function&)> callback, int timeout_ms); + template + static Status rpc(std::function address_provider, + std::function&)> callback, int timeout_ms); + static ExecEnv* get_exec_env() { return _s_exec_env; } private: diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 58e0562daf37ab..cd99f6b7df8d88 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3684,7 +3684,7 @@ public static int metaServiceRpcRetryTimes() { + "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."}) public static String block_sql_ast_names = ""; - public static long meta_service_rpc_reconnect_interval_ms = 5000; + public static long meta_service_rpc_reconnect_interval_ms = 100; public static long meta_service_rpc_retry_cnt = 10; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 049344bafe6cab..567d99c1939891 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -178,10 +178,12 @@ public Response executeRequest(Function long maxRetries = Config.meta_service_rpc_retry_cnt; for (long tried = 1; tried <= maxRetries; tried++) { MetaServiceClient client = null; + boolean requestFailed = false; try { client = proxy.getProxy(); return function.apply(client); } catch (StatusRuntimeException sre) { + requestFailed = true; LOG.warn("failed to request meta service code {}, msg {}, trycnt {}", sre.getStatus().getCode(), sre.getMessage(), tried); boolean shouldRetry = false; @@ -200,12 +202,13 @@ public Response executeRequest(Function throw new RpcException("", sre.getMessage(), sre); } } catch (Exception e) { + requestFailed = true; LOG.warn("failed to request meta servive trycnt {}", tried, e); if (tried >= maxRetries) { throw new RpcException("", e.getMessage(), e); } } finally { - if (proxy.needReconn() && client != null) { + if (requestFailed && proxy.needReconn() && client != null) { client.shutdown(true); } } @@ -227,7 +230,35 @@ public Response executeRequest(Function public Future getVisibleVersionAsync(Cloud.GetVersionRequest request) throws RpcException { - return w.executeRequest((client) -> client.getVisibleVersionAsync(request)); + MetaServiceClient client = null; + try { + client = getProxy(); + Future future = client.getVisibleVersionAsync(request); + if (future instanceof com.google.common.util.concurrent.ListenableFuture) { + com.google.common.util.concurrent.ListenableFuture listenableFuture = + (com.google.common.util.concurrent.ListenableFuture) future; + MetaServiceClient finalClient = client; + com.google.common.util.concurrent.Futures.addCallback(listenableFuture, + new com.google.common.util.concurrent.FutureCallback() { + @Override + public void onSuccess(Cloud.GetVersionResponse result) { + } + + @Override + public void onFailure(Throwable t) { + if (finalClient != null) { + finalClient.shutdown(true); + } + } + }, com.google.common.util.concurrent.MoreExecutors.directExecutor()); + } + return future; + } catch (Exception e) { + if (client != null) { + client.shutdown(true); + } + throw new RpcException("", e.getMessage(), e); + } } public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest request) throws RpcException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java new file mode 100644 index 00000000000000..2d5a4353265f1f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.rpc; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.common.Config; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.rpc.RpcException; + +import com.google.common.util.concurrent.SettableFuture; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Map; +import java.util.Queue; + +public class MetaServiceProxyTest { + private String originEndpoint; + private long originReconnectIntervalMs; + private long originRetryCnt; + + @Before + public void setUp() { + originEndpoint = Config.meta_service_endpoint; + originReconnectIntervalMs = Config.meta_service_rpc_reconnect_interval_ms; + originRetryCnt = Config.meta_service_rpc_retry_cnt; + + Config.meta_service_endpoint = "127.0.0.1:12345"; + Config.meta_service_rpc_reconnect_interval_ms = 0; + Config.meta_service_rpc_retry_cnt = 1; + } + + @After + public void tearDown() { + Config.meta_service_endpoint = originEndpoint; + Config.meta_service_rpc_reconnect_interval_ms = originReconnectIntervalMs; + Config.meta_service_rpc_retry_cnt = originRetryCnt; + } + + @Test + public void testExecuteRequestNoShutdownOnSuccess() throws RpcException { + MetaServiceProxy proxy = new MetaServiceProxy(); + MetaServiceClient client = Mockito.mock(MetaServiceClient.class); + Mockito.when(client.isNormalState()).thenReturn(true); + Mockito.when(client.isConnectionAgeExpired()).thenReturn(false); + + Map serviceMap = Deencapsulation.getField(proxy, "serviceMap"); + serviceMap.put(Config.meta_service_endpoint, client); + Queue lastConnTimeMs = Deencapsulation.getField(proxy, "lastConnTimeMs"); + lastConnTimeMs.clear(); + lastConnTimeMs.add(0L); + lastConnTimeMs.add(0L); + lastConnTimeMs.add(0L); + + MetaServiceProxy.MetaServiceClientWrapper wrapper = Deencapsulation.getField(proxy, "w"); + String response = wrapper.executeRequest((ignored) -> "ok"); + Assert.assertEquals("ok", response); + Mockito.verify(client, Mockito.never()).shutdown(Mockito.anyBoolean()); + } + + @Test + public void testExecuteRequestShutdownOnFailure() { + MetaServiceProxy proxy = new MetaServiceProxy(); + MetaServiceClient client = Mockito.mock(MetaServiceClient.class); + Mockito.when(client.isNormalState()).thenReturn(true); + Mockito.when(client.isConnectionAgeExpired()).thenReturn(false); + + Map serviceMap = Deencapsulation.getField(proxy, "serviceMap"); + serviceMap.put(Config.meta_service_endpoint, client); + Queue lastConnTimeMs = Deencapsulation.getField(proxy, "lastConnTimeMs"); + lastConnTimeMs.clear(); + lastConnTimeMs.add(0L); + lastConnTimeMs.add(0L); + lastConnTimeMs.add(0L); + + MetaServiceProxy.MetaServiceClientWrapper wrapper = Deencapsulation.getField(proxy, "w"); + try { + wrapper.executeRequest((ignored) -> { + throw new RuntimeException("rpc failed"); + }); + Assert.fail("should throw RpcException"); + } catch (RpcException ignored) { + // expected + } + + Mockito.verify(client).shutdown(true); + } + + @Test + public void testGetVisibleVersionAsyncShutdownOnFailure() throws RpcException { + MetaServiceProxy proxy = new MetaServiceProxy(); + MetaServiceClient client = Mockito.mock(MetaServiceClient.class); + Mockito.when(client.isNormalState()).thenReturn(true); + Mockito.when(client.isConnectionAgeExpired()).thenReturn(false); + + SettableFuture future = SettableFuture.create(); + Mockito.when(client.getVisibleVersionAsync(Mockito.any())).thenReturn(future); + + Map serviceMap = Deencapsulation.getField(proxy, "serviceMap"); + serviceMap.put(Config.meta_service_endpoint, client); + + Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder().build(); + proxy.getVisibleVersionAsync(request); + + future.setException(new RuntimeException("async failed")); + + Mockito.verify(client).shutdown(true); + } +} From 0cfce02a7c70c3795baeb4f305199b632db80471 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Fri, 9 Jan 2026 10:59:41 +0800 Subject: [PATCH 2/2] fix --- be/src/runtime/stream_load/stream_load_executor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index de7a979cedcd45..907fbdf7410301 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -213,12 +213,12 @@ Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnCommitResult result; int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); #ifndef BE_TEST + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; RETURN_IF_ERROR(ThriftRpcHelper::rpc( master_addr_provider, [&request, &result](FrontendServiceConnection& client) { @@ -258,11 +258,11 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { request.__set_txnId(ctx->txn_id); } - auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxn2PCResult result; int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; RETURN_IF_ERROR(ThriftRpcHelper::rpc( master_addr_provider, [&request, &result](FrontendServiceConnection& client) { @@ -310,9 +310,9 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; get_commit_request(ctx, request); - auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnCommitResult result; #ifndef BE_TEST + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; RETURN_IF_ERROR(ThriftRpcHelper::rpc( master_addr_provider, [&request, &result](FrontendServiceConnection& client) { @@ -342,7 +342,6 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1); - auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; TLoadTxnRollbackRequest request; set_request_auth(&request, ctx->auth); request.__set_db(ctx->db); @@ -363,6 +362,7 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { TLoadTxnRollbackResult result; #ifndef BE_TEST + auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; }; auto rpc_st = ThriftRpcHelper::rpc( master_addr_provider, [&request, &result](FrontendServiceConnection& client) { client->loadTxnRollback(result, request);