Skip to content

Commit

Permalink
Optimize: optimize client class or datastruct, read wait-free (#127)
Browse files Browse the repository at this point in the history
- add lightly concurrent hashmap
- GetProxy read wait-free
- fiber transport ConnectorGroupManager read wait-free
  • Loading branch information
helloopenworld authored Apr 16, 2024
1 parent ce52448 commit 5100aa1
Show file tree
Hide file tree
Showing 28 changed files with 965 additions and 201 deletions.
6 changes: 2 additions & 4 deletions trpc/client/service_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@
#include "trpc/util/net_util.h"
#include "trpc/util/string/string_util.h"
#include "trpc/util/time.h"
#include "trpc/util/unique_id.h"

namespace trpc {

static UniqueId unique_id;

Status ServiceProxy::UnaryInvoke(const ClientContextPtr& context, const ProtocolPtr& req, ProtocolPtr& rsp) {
TRPC_FMT_DEBUG("UnaryInvoke msg request_id: {}", context->GetRequestId());

Expand Down Expand Up @@ -407,7 +404,7 @@ void ServiceProxy::FillClientContext(const ClientContextPtr& context) {

// Set unique request id
if (TRPC_LIKELY(!context->IsSetRequestId())) {
context->SetRequestId(unique_id.GenerateId());
context->SetRequestId(unique_id_.GenerateId());
}

if (option_->timeout == UINT32_MAX && context->GetTimeout() == UINT32_MAX) {
Expand Down Expand Up @@ -475,6 +472,7 @@ TransInfo ServiceProxy::ProxyOptionToTransInfo() {
trans_info.fiber_pipeline_connector_queue_size = option_->fiber_pipeline_connector_queue_size;
trans_info.protocol = option_->codec_name;
trans_info.fiber_connpool_shards = option_->fiber_connpool_shards;
trans_info.endpoint_hash_bucket_size = option_->endpoint_hash_bucket_size;

// set the callback function
trans_info.conn_close_function = option_->proxy_callback.conn_close_function;
Expand Down
3 changes: 3 additions & 0 deletions trpc/client/service_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "trpc/transport/client/client_transport.h"
#include "trpc/transport/client/preallocation_option.h"
#include "trpc/tvar/basic_ops/reducer.h"
#include "trpc/util/unique_id.h"

namespace trpc {

Expand Down Expand Up @@ -191,6 +192,8 @@ class ServiceProxy {
private:
std::shared_ptr<ServiceProxyOption> option_;

UniqueId unique_id_;

// service routing name
std::string service_name_;

Expand Down
33 changes: 20 additions & 13 deletions trpc/client/service_proxy_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,34 @@ void ServiceProxyManager::SetOptionDefaultValue(const std::string& name, std::sh
}

void ServiceProxyManager::Stop() {
for (int i = 0; i != kShards; ++i) {
auto& shard = shards_[i];
std::scoped_lock _(shard.lock);
if (is_stoped_.exchange(true)) {
return;
}

service_proxys_.GetAllItems(service_proxys_to_destroy_);

for (auto& it : shard.service_proxys) {
it.second->Stop();
}
for (auto& item : service_proxys_to_destroy_) {
item.second->Stop();
}
}

void ServiceProxyManager::Destroy() {
for (int i = 0; i < kShards; ++i) {
auto& shard = shards_[i];
std::scoped_lock _(shard.lock);
if (!is_stoped_.load()) {
return;
}

for (auto& it : shard.service_proxys) {
it.second->Destroy();
}
if (is_destroyed_.exchange(true)) {
return;
}

shard.service_proxys.clear();
for (auto& item : service_proxys_to_destroy_) {
item.second->Destroy();
}

service_proxys_.Reclaim();
service_proxys_to_destroy_.clear();
is_stoped_.store(false);
is_destroyed_.store(false);
}

} // namespace trpc
88 changes: 42 additions & 46 deletions trpc/client/service_proxy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "trpc/client/service_proxy.h"
#include "trpc/client/service_proxy_option_setter.h"
#include "trpc/common/config/trpc_config.h"
#include "trpc/util/concurrency/lightly_concurrent_hashmap.h"
#include "trpc/util/likely.h"

namespace trpc {
Expand Down Expand Up @@ -92,14 +93,12 @@ class ServiceProxyManager {
void SetOptionDefaultValue(const std::string& name, std::shared_ptr<ServiceProxyOption>& option);

private:
struct alignas(hardware_destructive_interference_size) Shard {
std::mutex lock;
std::unordered_map<std::string, std::shared_ptr<ServiceProxy>> service_proxys;
};
concurrency::LightlyConcurrentHashMap<std::string, std::shared_ptr<ServiceProxy>> service_proxys_;

constexpr static uint16_t kShards = 64;
std::unordered_map<std::string, std::shared_ptr<ServiceProxy>> service_proxys_to_destroy_;

Shard shards_[kShards];
std::atomic<bool> is_stoped_{false};
std::atomic<bool> is_destroyed_{false};
};

template <typename T>
Expand All @@ -109,14 +108,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name) {
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -145,9 +140,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name) {
new_proxy->SetEndpointInfo(option->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
Expand All @@ -157,14 +155,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand All @@ -182,9 +176,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
new_proxy->SetEndpointInfo(option_ptr->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
Expand All @@ -195,14 +192,10 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name,
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -233,25 +226,25 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name,
new_proxy->SetEndpointInfo(option->target);
}

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

template <typename T>
std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const ServiceProxyOption* option_ptr) {
if (TRPC_UNLIKELY(name.empty())) {
TRPC_FMT_CRITICAL("GetProxy failed, name is empty.");
return nullptr;
}

std::size_t hash_value = std::hash<std::string>{}(name);

auto& shard = shards_[hash_value % kShards];
std::scoped_lock _(shard.lock);

auto it = shard.service_proxys.find(name);
if (it != shard.service_proxys.end()) {
return std::dynamic_pointer_cast<T>(it->second);
std::shared_ptr<ServiceProxy> proxy;
bool ret = service_proxys_.Get(name, proxy);
if (ret) {
return std::dynamic_pointer_cast<T>(proxy);
}

std::shared_ptr<T> new_proxy = std::make_shared<T>();
Expand Down Expand Up @@ -298,9 +291,12 @@ std::shared_ptr<T> ServiceProxyManager::GetProxy(const std::string& name, const
TRPC_FMT_TRACE("ServiceProxy name:{}, target:{}, threadmodel_instance_name:{}", name, new_proxy->option_->target,
new_proxy->option_->threadmodel_instance_name);

shard.service_proxys[name] = std::static_pointer_cast<ServiceProxy>(new_proxy);
ret = service_proxys_.GetOrInsert(name, std::static_pointer_cast<ServiceProxy>(new_proxy), proxy);
if (!ret) {
return new_proxy;
}

return new_proxy;
return std::dynamic_pointer_cast<T>(proxy);
}

} // namespace trpc
5 changes: 4 additions & 1 deletion trpc/client/service_proxy_option.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ struct ServiceProxyOption {

/// The queue size of FiberPipelineConnector
/// if memory usage high, reduce it
uint32_t fiber_pipeline_connector_queue_size = 16 * 1024;
uint32_t fiber_pipeline_connector_queue_size{16 * 1024};

/// The hashmap bucket size for storing ip/port <--> Connector
uint32_t endpoint_hash_bucket_size{kEndpointHashBucketSize};

/// Whether to support reconnection in fixed connection mode, the default value is true.
/// For scenarios where reconnection is not allowed, such as transactional operations, set this value to false.
Expand Down
4 changes: 4 additions & 0 deletions trpc/client/service_proxy_option_setter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void SetDefaultOption(const std::shared_ptr<ServiceProxyOption>& option) {
option->is_reconnection = kDefaultIsReconnection;
option->connect_timeout = kDefaultConnectTimeout;
option->allow_reconnect = kDefaultAllowReconnect;
option->endpoint_hash_bucket_size = kEndpointHashBucketSize;
option->threadmodel_type_name = kDefaultThreadmodelType;
option->threadmodel_instance_name = "";
option->support_pipeline = kDefaultSupportPipeline;
Expand Down Expand Up @@ -187,6 +188,9 @@ void SetSpecifiedOption(const ServiceProxyOption* option_ptr, const std::shared_
auto allow_reconnect = GetValidInput<bool>(option_ptr->allow_reconnect, kDefaultAllowReconnect);
SetOutputByValidInput<bool>(allow_reconnect, option->allow_reconnect);

auto endpoint_hash_bucket_size = GetValidInput<uint32_t>(option_ptr->endpoint_hash_bucket_size, 0);
SetOutputByValidInput<uint32_t>(endpoint_hash_bucket_size, option->endpoint_hash_bucket_size);

auto threadmodel_type_name = GetValidInput<std::string>(option_ptr->threadmodel_type_name, kDefaultThreadmodelType);
SetOutputByValidInput<std::string>(threadmodel_type_name, option->threadmodel_type_name);

Expand Down
3 changes: 3 additions & 0 deletions trpc/common/config/client_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct ServiceProxyConfig {
/// Use in fiber runtime
uint32_t send_queue_timeout{kDefaultSendQueueTimeout};

/// The hashmap bucket size for storing ip/port <--> Connector
uint32_t endpoint_hash_bucket_size{kEndpointHashBucketSize};

/// The thread model type use by `ServiceProxy`, deprecated.
std::string threadmodel_type;

Expand Down
7 changes: 4 additions & 3 deletions trpc/common/config/client_conf_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct convert<trpc::ServiceProxyConfig> {
node["recv_buffer_size"] = proxy_config.recv_buffer_size;
node["send_queue_capacity"] = proxy_config.send_queue_capacity;
node["send_queue_timeout"] = proxy_config.send_queue_timeout;
node["endpoint_hash_bucket_size"] = proxy_config.endpoint_hash_bucket_size;
node["threadmodel_type"] = proxy_config.threadmodel_type;
node["threadmodel_instance_name"] = proxy_config.threadmodel_instance_name;
node["selector_name"] = proxy_config.selector_name;
Expand Down Expand Up @@ -96,15 +97,15 @@ struct convert<trpc::ServiceProxyConfig> {
}
if (node["is_reconnection"]) proxy_config.is_reconnection = node["is_reconnection"].as<bool>();
if (node["allow_reconnect"]) proxy_config.allow_reconnect = node["allow_reconnect"].as<bool>();
if (node["max_packet_size"]) proxy_config.max_packet_size = node["max_packet_size"].as<uint32_t>();
if (node["max_packet_size"]) proxy_config.max_packet_size = node["max_packet_size"].as<uint32_t>();
if (node["max_conn_num"]) proxy_config.max_conn_num = node["max_conn_num"].as<uint32_t>();
if (node["idle_time"]) proxy_config.idle_time = node["idle_time"].as<uint32_t>();
if (node["recv_buffer_size"]) proxy_config.recv_buffer_size = node["recv_buffer_size"].as<uint32_t>();
if (node["send_queue_capacity"]) proxy_config.send_queue_capacity = node["send_queue_capacity"].as<uint32_t>();
if (node["send_queue_timeout"]) proxy_config.send_queue_timeout = node["send_queue_timeout"].as<uint32_t>();
if (node["endpoint_hash_bucket_size"]) proxy_config.endpoint_hash_bucket_size = node["endpoint_hash_bucket_size"].as<uint32_t>();
if (node["threadmodel_type"]) proxy_config.threadmodel_type = node["threadmodel_type"].as<std::string>();
if (node["threadmodel_instance_name"])
proxy_config.threadmodel_instance_name = node["threadmodel_instance_name"].as<std::string>();
if (node["threadmodel_instance_name"]) proxy_config.threadmodel_instance_name = node["threadmodel_instance_name"].as<std::string>();
if (node["selector_name"]) proxy_config.selector_name = node["selector_name"].as<std::string>();
if (node["namespace"]) proxy_config.namespace_ = node["namespace"].as<std::string>();
if (node["load_balance_name"]) proxy_config.load_balance_name = node["load_balance_name"].as<std::string>();
Expand Down
3 changes: 3 additions & 0 deletions trpc/common/config/default_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ constexpr uint32_t kDefaultSendQueueCapacity = 0;
/// Note: use in fiber runtime.
constexpr uint32_t kDefaultSendQueueTimeout = 3000;

/// The default size of hashmap bucket for storing ip/port <--> Connector
constexpr uint32_t kEndpointHashBucketSize = 1024;

/// The default maximum number of connections that can be established to the backend nodes.
constexpr uint32_t kDefaultMaxConnNum = 64;

Expand Down
3 changes: 0 additions & 3 deletions trpc/common/trpc_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include "gflags/gflags.h"

// #include "trpc/client/http/http_service_proxy.h"
#include "trpc/filter/server_filter_manager.h"
#include "trpc/runtime/runtime.h"
#include "trpc/tvar/common/sampler.h"
Expand Down Expand Up @@ -89,8 +88,6 @@ void TrpcApp::DestroyRuntime() {

server_->Destroy();

trpc::GetTrpcClient()->Destroy();

TrpcPlugin::GetInstance()->DestroyResource();
}

Expand Down
2 changes: 2 additions & 0 deletions trpc/common/trpc_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ void TrpcPlugin::DestroyResource() {

log::Destroy();

GetTrpcClient()->Destroy();

is_all_inited_ = false;
is_all_destroyed_ = false;
is_invoke_by_framework_ = false;
Expand Down
2 changes: 1 addition & 1 deletion trpc/coroutine/fiber_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ TEST(Fiber, FiberSleepInFiberContext) {
<< ",error_time:" << error_time << std::endl;

ASSERT_NEAR((ReadSystemClock() - start) / std::chrono::milliseconds(1),
sleep_for / std::chrono::milliseconds(1), 20);
sleep_for / std::chrono::milliseconds(1), 100);

// Test FiberSleepUntil.
auto sleep_until = ReadSystemClock() + 100 * std::chrono::milliseconds(1);
Expand Down
2 changes: 1 addition & 1 deletion trpc/server/non_rpc/non_rpc_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace trpc {

void NonRpcServiceImpl::Dispatch(const ServerContextPtr& context, const ProtocolPtr& req,
ProtocolPtr& rsp) noexcept {
auto non_rpc_service_methods = GetNonRpcServiceMethod();
auto& non_rpc_service_methods = GetNonRpcServiceMethod();
auto it = non_rpc_service_methods.find(context->GetFuncName());
if (it == non_rpc_service_methods.end()) {
HandleNoFuncError(context);
Expand Down
1 change: 1 addition & 0 deletions trpc/transport/client/fiber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cc_library(
"//trpc/transport/client/fiber/conn_complex:fiber_conn_complex_impl",
"//trpc/transport/client/fiber/conn_pool:fiber_conn_pool_impl",
"//trpc/transport/client/fiber/pipeline:fiber_pipeline_impl",
"//trpc/util/concurrency:lightly_concurrent_hashmap",
"//trpc/util/hazptr",
"//trpc/util/log:logging",
],
Expand Down
Loading

0 comments on commit 5100aa1

Please sign in to comment.