Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ DEFINE_mBool(enable_cloud_tablet_report, "true");

DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");

DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "100");

DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");

Expand Down
23 changes: 11 additions & 12 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
TLoadTxnBeginResult result;
Status status;
int64_t duration_ns = 0;
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; };
TNetworkAddress master_addr = master_addr_provider();
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master heartbeat yet");
} else {
SCOPED_RAW_TIMER(&duration_ns);
#ifndef BE_TEST
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
master_addr_provider, [&request, &result](FrontendServiceConnection& client) {
client->loadTxnBegin(result, request);
}));
#else
Expand Down Expand Up @@ -213,14 +213,14 @@ Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);

TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
#ifndef BE_TEST
auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnPreCommit(result, request);
},
Expand Down Expand Up @@ -258,13 +258,13 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
request.__set_txnId(ctx->txn_id);
}

TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxn2PCResult result;
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxn2PC(result, request);
},
Expand Down Expand Up @@ -310,11 +310,11 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);

TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
#ifndef BE_TEST
auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnCommit(result, request);
},
Expand Down Expand Up @@ -342,7 +342,6 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1);

TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
request.__set_db(ctx->db);
Expand All @@ -363,9 +362,9 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {

TLoadTxnRollbackResult result;
#ifndef BE_TEST
auto master_addr_provider = [this]() { return _exec_env->cluster_info()->master_fe_addr; };
auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
master_addr_provider, [&request, &result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
});
if (!rpc_st.ok()) {
Expand Down
52 changes: 43 additions & 9 deletions be/src/util/thrift_rpc_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ void ThriftRpcHelper::setup(ExecEnv* exec_env) {
template <typename T>
Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
TNetworkAddress address = make_network_address(ip, port);
return rpc<T>([ip, port]() { return make_network_address(ip, port); }, callback, timeout_ms);
}

template <typename T>
Status ThriftRpcHelper::rpc(std::function<TNetworkAddress()> address_provider,
std::function<void(ClientConnection<T>&)> callback, int timeout_ms) {
TNetworkAddress address = address_provider();
if (address.hostname.empty() || address.port == 0) {
return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
}
Status status;
DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, timeout_ms, &status);
Expand All @@ -85,15 +94,36 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
#endif
std::this_thread::sleep_for(
std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
status = client.reopen(timeout_ms);
if (!status.ok()) {
TNetworkAddress retry_address = address_provider();
if (retry_address.hostname.empty() || retry_address.port == 0) {
return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is not available");
}
if (retry_address.hostname != address.hostname || retry_address.port != address.port) {
#ifndef ADDRESS_SANITIZER
LOG(INFO) << "retrying call frontend service with new address=" << retry_address;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just log without check ifndef address?

#endif
Status retry_status;
ClientConnection<T> retry_client(_s_exec_env->get_client_cache<T>(), retry_address,
timeout_ms, &retry_status);
if (!retry_status.ok()) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "Connect frontend failed, address=" << retry_address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

<< ", status=" << retry_status;
#endif
return retry_status;
}
callback(retry_client);
} else {
status = client.reopen(timeout_ms);
if (!status.ok()) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "client reopen failed. address=" << address
<< ", status=" << status;
LOG(WARNING) << "client reopen failed. address=" << address
<< ", status=" << status;
#endif
return status;
return status;
}
callback(client);
}
callback(client);
}
} catch (apache::thrift::TException& e) {
#ifndef ADDRESS_SANITIZER
Expand All @@ -104,8 +134,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
// just reopen to disable this connection
static_cast<void>(client.reopen(timeout_ms));
return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}", ip,
port, e.what());
return Status::RpcError("failed to call frontend service, FE address={}:{}, reason: {}",
address.hostname, address.port, e.what());
}
return Status::OK();
}
Expand All @@ -114,6 +144,10 @@ template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
const std::string& ip, const int32_t port,
std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms);

template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
std::function<TNetworkAddress()> address_provider,
std::function<void(ClientConnection<FrontendServiceClient>&)> callback, int timeout_ms);

template Status ThriftRpcHelper::rpc<BackendServiceClient>(
const std::string& ip, const int32_t port,
std::function<void(ClientConnection<BackendServiceClient>&)> callback, int timeout_ms);
Expand Down
11 changes: 11 additions & 0 deletions be/src/util/thrift_rpc_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <gen_cpp/Types_types.h>
#include <stdint.h>

#include <functional>
Expand All @@ -43,10 +44,20 @@ class ThriftRpcHelper {
return rpc(ip, port, callback, config::thrift_rpc_timeout_ms);
}

template <typename T>
static Status rpc(std::function<TNetworkAddress()> address_provider,
std::function<void(ClientConnection<T>&)> callback) {
return rpc(address_provider, callback, config::thrift_rpc_timeout_ms);
}

template <typename T>
static Status rpc(const std::string& ip, const int32_t port,
std::function<void(ClientConnection<T>&)> callback, int timeout_ms);

template <typename T>
static Status rpc(std::function<TNetworkAddress()> address_provider,
std::function<void(ClientConnection<T>&)> callback, int timeout_ms);

static ExecEnv* get_exec_env() { return _s_exec_env; }

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3684,7 +3684,7 @@ public static int metaServiceRpcRetryTimes() {
+ "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";

public static long meta_service_rpc_reconnect_interval_ms = 5000;
public static long meta_service_rpc_reconnect_interval_ms = 100;

public static long meta_service_rpc_retry_cnt = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ public <Response> Response executeRequest(Function<MetaServiceClient, Response>
long maxRetries = Config.meta_service_rpc_retry_cnt;
for (long tried = 1; tried <= maxRetries; tried++) {
MetaServiceClient client = null;
boolean requestFailed = false;
try {
client = proxy.getProxy();
return function.apply(client);
} catch (StatusRuntimeException sre) {
requestFailed = true;
LOG.warn("failed to request meta service code {}, msg {}, trycnt {}", sre.getStatus().getCode(),
sre.getMessage(), tried);
boolean shouldRetry = false;
Expand All @@ -200,12 +202,13 @@ public <Response> Response executeRequest(Function<MetaServiceClient, Response>
throw new RpcException("", sre.getMessage(), sre);
}
} catch (Exception e) {
requestFailed = true;
LOG.warn("failed to request meta servive trycnt {}", tried, e);
if (tried >= maxRetries) {
throw new RpcException("", e.getMessage(), e);
}
} finally {
if (proxy.needReconn() && client != null) {
if (requestFailed && proxy.needReconn() && client != null) {
client.shutdown(true);
}
}
Expand All @@ -227,7 +230,35 @@ public <Response> Response executeRequest(Function<MetaServiceClient, Response>

public Future<Cloud.GetVersionResponse> getVisibleVersionAsync(Cloud.GetVersionRequest request)
throws RpcException {
return w.executeRequest((client) -> client.getVisibleVersionAsync(request));
MetaServiceClient client = null;
try {
client = getProxy();
Future<Cloud.GetVersionResponse> future = client.getVisibleVersionAsync(request);
if (future instanceof com.google.common.util.concurrent.ListenableFuture) {
com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse> listenableFuture =
(com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse>) future;
MetaServiceClient finalClient = client;
com.google.common.util.concurrent.Futures.addCallback(listenableFuture,
new com.google.common.util.concurrent.FutureCallback<Cloud.GetVersionResponse>() {
@Override
public void onSuccess(Cloud.GetVersionResponse result) {
}

@Override
public void onFailure(Throwable t) {
if (finalClient != null) {
finalClient.shutdown(true);
}
}
}, com.google.common.util.concurrent.MoreExecutors.directExecutor());
}
return future;
} catch (Exception e) {
if (client != null) {
client.shutdown(true);
}
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest request) throws RpcException {
Expand Down
Loading
Loading