Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize: optimize client class or datastruct, read wait-free #127

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions trpc/client/service_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@
#include "trpc/util/net_util.h"
#include "trpc/util/string/string_util.h"
#include "trpc/util/time.h"
#include "trpc/util/unique_id.h"

namespace trpc {

static UniqueId unique_id;

Status ServiceProxy::UnaryInvoke(const ClientContextPtr& context, const ProtocolPtr& req, ProtocolPtr& rsp) {
TRPC_FMT_DEBUG("UnaryInvoke msg request_id: {}", context->GetRequestId());

Expand Down Expand Up @@ -407,7 +404,7 @@ void ServiceProxy::FillClientContext(const ClientContextPtr& context) {

// Set unique request id
if (TRPC_LIKELY(!context->IsSetRequestId())) {
context->SetRequestId(unique_id.GenerateId());
context->SetRequestId(unique_id_.GenerateId());
}

if (option_->timeout == UINT32_MAX && context->GetTimeout() == UINT32_MAX) {
Expand Down Expand Up @@ -475,6 +472,7 @@ TransInfo ServiceProxy::ProxyOptionToTransInfo() {
trans_info.fiber_pipeline_connector_queue_size = option_->fiber_pipeline_connector_queue_size;
trans_info.protocol = option_->codec_name;
trans_info.fiber_connpool_shards = option_->fiber_connpool_shards;
trans_info.endpoint_hash_bucket_size = option_->endpoint_hash_bucket_size;

// set the callback function
trans_info.conn_close_function = option_->proxy_callback.conn_close_function;
Expand Down
3 changes: 3 additions & 0 deletions trpc/client/service_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "trpc/transport/client/client_transport.h"
#include "trpc/transport/client/preallocation_option.h"
#include "trpc/tvar/basic_ops/reducer.h"
#include "trpc/util/unique_id.h"

namespace trpc {

Expand Down Expand Up @@ -191,6 +192,8 @@ class ServiceProxy {
private:
std::shared_ptr<ServiceProxyOption> option_;

UniqueId unique_id_;

// service routing name
std::string service_name_;

Expand Down
33 changes: 20 additions & 13 deletions trpc/client/service_proxy_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,34 @@ void ServiceProxyManager::SetOptionDefaultValue(const std::string& name, std::sh
}

void ServiceProxyManager::Stop() {
for (int i = 0; i != kShards; ++i) {
auto& shard = shards_[i];
std::scoped_lock _(shard.lock);
if (is_stoped_.exchange(true)) {
return;
}

service_proxys_.GetAllItems(service_proxys_to_destroy_);

for (auto& it : shard.service_proxys) {
it.second->Stop();
}
for (auto& item : service_proxys_to_destroy_) {
item.second->Stop();
}
}

void ServiceProxyManager::Destroy() {
for (int i = 0; i < kShards; ++i) {
auto& shard = shards_[i];
std::scoped_lock _(shard.lock);
if (!is_stoped_.load()) {
return;
}

for (auto& it : shard.service_proxys) {
it.second->Destroy();
}
if (is_destroyed_.exchange(true)) {
return;
}

shard.service_proxys.clear();
for (auto& item : service_proxys_to_destroy_) {
item.second->Destroy();
}

service_proxys_.Reclaim();
service_proxys_to_destroy_.clear();
is_stoped_.store(false);
is_destroyed_.store(false);
}

} // namespace trpc
88 changes: 42 additions & 46 deletions trpc/client/service_proxy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "trpc/client/service_proxy.h"
#include "trpc/client/service_proxy_option_setter.h"
#include "trpc/common/config/trpc_config.h"
#include "trpc/util/concurrency/lightly_concurrent_hashmap.h"
#include "trpc/util/likely.h"

namespace trpc {
Expand Down Expand Up @@ -92,14 +93,12 @@ class ServiceProxyManager {
void SetOptionDefaultValue(const std::string& name, std::shared_ptr<ServiceProxyOption>& option);

private:
struct alignas(hardware_destructive_interference_size) Shard {
std::mutex lock;
std::unordered_map<std::string, std::shared_ptr<ServiceProxy>> service_proxys;
};
concurrency::LightlyConcurrentHashMap<std::string, std::shared_ptr<ServiceProxy>> service_proxys_;

constexpr static uint16_t kShards = 64;
std::unordered_map<std::string, std::shared_ptr<ServiceProxy>> service_proxys_to_destroy_;

Shard shards_[kShards];
std::atomic<bool> is_stoped_{false};
std::atomic<bool> is_destroyed_{false};
};

template <typename T>
Expand All @@ -109,14 +108,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name) {
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -145,9 +140,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name) {
new_proxy->SetEndpointInfo(option->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
Expand All @@ -157,14 +155,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand All @@ -182,9 +176,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
new_proxy->SetEndpointInfo(option_ptr->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
Expand All @@ -195,14 +192,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name,
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -233,25 +226,25 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name,
new_proxy->SetEndpointInfo(option->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const ServiceProxyOption* option_ptr) {
if (TRPC_UNLIKELY(name.empty())) {
TRPC_FMT_CRITICAL("GetProxy failed, name is empty.");
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -298,9 +291,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
TRPC_FMT_TRACE("ServiceProxy name:{}, target:{}, threadmodel_instance_name:{}", name, new_proxy->option_->target,
new_proxy->option_->threadmodel_instance_name);

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

} // namespace trpc
5 changes: 4 additions & 1 deletion trpc/client/service_proxy_option.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ struct ServiceProxyOption {

/// The queue size of FiberPipelineConnector
/// if memory usage high, reduce it
uint32_t fiber_pipeline_connector_queue_size = 16 * 1024;
uint32_t fiber_pipeline_connector_queue_size{16 * 1024};

/// The hashmap bucket size for storing ip/port <--> Connector
uint32_t endpoint_hash_bucket_size{kEndpointHashBucketSize};

/// Whether to support reconnection in fixed connection mode, the default value is true.
/// For scenarios where reconnection is not allowed, such as transactional operations, set this value to false.
Expand Down
4 changes: 4 additions & 0 deletions trpc/client/service_proxy_option_setter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void SetDefaultOption(const std::shared_ptr<ServiceProxyOption>& option) {
option->is_reconnection = kDefaultIsReconnection;
option->connect_timeout = kDefaultConnectTimeout;
option->allow_reconnect = kDefaultAllowReconnect;
option->endpoint_hash_bucket_size = kEndpointHashBucketSize;
option->threadmodel_type_name = kDefaultThreadmodelType;
option->threadmodel_instance_name = "";
option->support_pipeline = kDefaultSupportPipeline;
Expand Down Expand Up @@ -187,6 +188,9 @@ void SetSpecifiedOption(const ServiceProxyOption* option_ptr, const std::shared_
auto allow_reconnect = GetValidInput<bool>(option_ptr->allow_reconnect, kDefaultAllowReconnect);
SetOutputByValidInput<bool>(allow_reconnect, option->allow_reconnect);

auto endpoint_hash_bucket_size = GetValidInput<uint32_t>(option_ptr->endpoint_hash_bucket_size, 0);
SetOutputByValidInput<uint32_t>(endpoint_hash_bucket_size, option->endpoint_hash_bucket_size);

auto threadmodel_type_name = GetValidInput<std::string>(option_ptr->threadmodel_type_name, kDefaultThreadmodelType);
SetOutputByValidInput<std::string>(threadmodel_type_name, option->threadmodel_type_name);

Expand Down
3 changes: 3 additions & 0 deletions trpc/common/config/client_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct ServiceProxyConfig {
/// Use in fiber runtime
uint32_t send_queue_timeout{kDefaultSendQueueTimeout};

/// The hashmap bucket size for storing ip/port <--> Connector
uint32_t endpoint_hash_bucket_size{kEndpointHashBucketSize};

/// The thread model type use by `ServiceProxy`, deprecated.
std::string threadmodel_type;

Expand Down
7 changes: 4 additions & 3 deletions trpc/common/config/client_conf_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct convert<trpc::ServiceProxyConfig> {
node["recv_buffer_size"] = proxy_config.recv_buffer_size;
node["send_queue_capacity"] = proxy_config.send_queue_capacity;
node["send_queue_timeout"] = proxy_config.send_queue_timeout;
node["endpoint_hash_bucket_size"] = proxy_config.endpoint_hash_bucket_size;
node["threadmodel_type"] = proxy_config.threadmodel_type;
node["threadmodel_instance_name"] = proxy_config.threadmodel_instance_name;
node["selector_name"] = proxy_config.selector_name;
Expand Down Expand Up @@ -96,15 +97,15 @@ struct convert<trpc::ServiceProxyConfig> {
}
if (node["is_reconnection"]) proxy_config.is_reconnection = node["is_reconnection"].as<bool>();
if (node["allow_reconnect"]) proxy_config.allow_reconnect = node["allow_reconnect"].as<bool>();
if (node["max_packet_size"]) proxy_config.max_packet_size = node["max_packet_size"].as<uint32_t>();
if (node["max_packet_size"]) proxy_config.max_packet_size = node["max_packet_size"].as<uint32_t>();
if (node["max_conn_num"]) proxy_config.max_conn_num = node["max_conn_num"].as<uint32_t>();
if (node["idle_time"]) proxy_config.idle_time = node["idle_time"].as<uint32_t>();
if (node["recv_buffer_size"]) proxy_config.recv_buffer_size = node["recv_buffer_size"].as<uint32_t>();
if (node["send_queue_capacity"]) proxy_config.send_queue_capacity = node["send_queue_capacity"].as<uint32_t>();
if (node["send_queue_timeout"]) proxy_config.send_queue_timeout = node["send_queue_timeout"].as<uint32_t>();
if (node["endpoint_hash_bucket_size"]) proxy_config.endpoint_hash_bucket_size = node["endpoint_hash_bucket_size"].as<uint32_t>();
if (node["threadmodel_type"]) proxy_config.threadmodel_type = node["threadmodel_type"].as<std::string>();
if (node["threadmodel_instance_name"])
proxy_config.threadmodel_instance_name = node["threadmodel_instance_name"].as<std::string>();
if (node["threadmodel_instance_name"]) proxy_config.threadmodel_instance_name = node["threadmodel_instance_name"].as<std::string>();
if (node["selector_name"]) proxy_config.selector_name = node["selector_name"].as<std::string>();
if (node["namespace"]) proxy_config.namespace_ = node["namespace"].as<std::string>();
if (node["load_balance_name"]) proxy_config.load_balance_name = node["load_balance_name"].as<std::string>();
Expand Down
3 changes: 3 additions & 0 deletions trpc/common/config/default_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ constexpr uint32_t kDefaultSendQueueCapacity = 0;
/// Note: use in fiber runtime.
constexpr uint32_t kDefaultSendQueueTimeout = 3000;

/// The default size of hashmap bucket for storing ip/port <--> Connector
constexpr uint32_t kEndpointHashBucketSize = 1024;

/// The default maximum number of connections that can be established to the backend nodes.
constexpr uint32_t kDefaultMaxConnNum = 64;

Expand Down
3 changes: 0 additions & 3 deletions trpc/common/trpc_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include "gflags/gflags.h"

// #include "trpc/client/http/http_service_proxy.h"
#include "trpc/filter/server_filter_manager.h"
#include "trpc/runtime/runtime.h"
#include "trpc/tvar/common/sampler.h"
Expand Down Expand Up @@ -89,8 +88,6 @@ void TrpcApp::DestroyRuntime() {

server_->Destroy();

trpc::GetTrpcClient()->Destroy();

TrpcPlugin::GetInstance()->DestroyResource();
}

Expand Down
2 changes: 2 additions & 0 deletions trpc/common/trpc_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ void TrpcPlugin::DestroyResource() {

log::Destroy();

GetTrpcClient()->Destroy();

is_all_inited_ = false;
is_all_destroyed_ = false;
is_invoke_by_framework_ = false;
Expand Down
2 changes: 1 addition & 1 deletion trpc/coroutine/fiber_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ TEST(Fiber, FiberSleepInFiberContext) {
<< ",error_time:" << error_time << std::endl;

ASSERT_NEAR((ReadSystemClock() - start) / std::chrono::milliseconds(1),
sleep_for / std::chrono::milliseconds(1), 20);
sleep_for / std::chrono::milliseconds(1), 100);

// Test FiberSleepUntil.
auto sleep_until = ReadSystemClock() + 100 * std::chrono::milliseconds(1);
Expand Down
2 changes: 1 addition & 1 deletion trpc/server/non_rpc/non_rpc_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace trpc {

void NonRpcServiceImpl::Dispatch(const ServerContextPtr& context, const ProtocolPtr& req,
ProtocolPtr& rsp) noexcept {
auto non_rpc_service_methods = GetNonRpcServiceMethod();
auto& non_rpc_service_methods = GetNonRpcServiceMethod();
auto it = non_rpc_service_methods.find(context->GetFuncName());
if (it == non_rpc_service_methods.end()) {
HandleNoFuncError(context);
Expand Down
1 change: 1 addition & 0 deletions trpc/transport/client/fiber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cc_library(
"//trpc/transport/client/fiber/conn_complex:fiber_conn_complex_impl",
"//trpc/transport/client/fiber/conn_pool:fiber_conn_pool_impl",
"//trpc/transport/client/fiber/pipeline:fiber_pipeline_impl",
"//trpc/util/concurrency:lightly_concurrent_hashmap",
"//trpc/util/hazptr",
"//trpc/util/log:logging",
],
Expand Down
Loading
Loading