Skip to content

Commit

Permalink
Feat:Runtime state management to avoid duplicate calls (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun411522 authored Jan 30, 2024
1 parent 2f5fc31 commit 99fc614
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 65 deletions.
2 changes: 2 additions & 0 deletions trpc/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cc_library(
"//trpc/naming:trpc_naming_registry",
"//trpc/runtime",
"//trpc/runtime:fiber_runtime",
"//trpc/runtime:runtime_state",
"//trpc/runtime:separate_runtime",
"//trpc/runtime/common:periphery_task_scheduler",
"//trpc/serialization:trpc_serialization",
Expand Down Expand Up @@ -178,6 +179,7 @@ cc_test(
deps = [
":runtime_manager",
"//trpc/common/config:trpc_config",
"//trpc/util:latch",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
58 changes: 39 additions & 19 deletions trpc/common/runtime_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "trpc/runtime/common/periphery_task_scheduler.h"
#include "trpc/runtime/fiber_runtime.h"
#include "trpc/runtime/runtime.h"
#include "trpc/runtime/runtime_state.h"
#include "trpc/runtime/separate_runtime.h"
#include "trpc/serialization/trpc_serialization.h"
#include "trpc/telemetry/telemetry_factory.h"
Expand All @@ -40,6 +41,9 @@

namespace trpc {

// FrameworkRuntime state manager
RuntimeState framework_runtime_state{RuntimeState::kUnknown};

int RunInTrpcRuntime(std::function<int()>&& func) {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
Expand Down Expand Up @@ -121,37 +125,49 @@ int RunInThreadRuntime(std::function<int()>&& func) {
}

int InitFrameworkRuntime() {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
if (threadmodel_config.use_fiber_flag) {
runtime::SetRuntimeType(runtime::kFiberRuntime);
} else {
TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config.");
return -1;
}
// It can be restarted if it has not been started before or has already been destroyed
if (framework_runtime_state == RuntimeState::kUnknown || framework_runtime_state == RuntimeState::kDestroyed) {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
if (threadmodel_config.use_fiber_flag) {
runtime::SetRuntimeType(runtime::kFiberRuntime);
} else {
TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config.");
return -1;
}

util::IgnorePipe();
util::IgnorePipe();

// set object pool option
const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config;
memory_pool::SetMemBlockSize(buffer_pool_config.block_size);
memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold);
// set object pool option
const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config;
memory_pool::SetMemBlockSize(buffer_pool_config.block_size);
memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold);

internal::TimeKeeper::Instance()->Start();
internal::TimeKeeper::Instance()->Start();

if (IsInFiberRuntime()) {
fiber::StartRuntime();
runtime::InitFiberReactorConfig();
if (IsInFiberRuntime()) {
fiber::StartRuntime();
// need to StartAdminRuntime
separate::StartAdminRuntime();
runtime::InitFiberReactorConfig();
}

framework_runtime_state = RuntimeState::kStarted;
return 0;
}

return 0;
return -1;
}

bool IsInFiberRuntime() {
return runtime::IsInFiberRuntime();
}

void DestroyFrameworkRuntime() {
int DestroyFrameworkRuntime() {
if (framework_runtime_state != RuntimeState::kStarted) {
return -1;
}

if (IsInFiberRuntime()) {
separate::TerminateAdminRuntime();

Expand All @@ -162,6 +178,10 @@ void DestroyFrameworkRuntime() {
internal::TimeKeeper::Instance()->Join();

TrpcPlugin::GetInstance()->DestroyResource();

framework_runtime_state = RuntimeState::kDestroyed;

return 0;
}

} // namespace trpc
2 changes: 1 addition & 1 deletion trpc/common/runtime_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ int InitFrameworkRuntime();
/// @brief Destroy the runtime environment of the framework.
/// @note for compatible, please not use it.
/// @private For internal use purpose only.
void DestroyFrameworkRuntime();
int DestroyFrameworkRuntime();

} // namespace trpc
51 changes: 51 additions & 0 deletions trpc/common/runtime_manager_fiber_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "trpc/common/runtime_manager.h"
#include "trpc/coroutine/fiber.h"
#include "trpc/coroutine/fiber_latch.h"
#include "trpc/util/latch.h"

namespace trpc::testing {

Expand All @@ -34,6 +35,50 @@ TEST(RuntimeManager, Run) {
ASSERT_TRUE(running_flag);
}

TEST(RuntimeManager, RunInTrpcRuntimeInTrpcRuntime) {
// Test calling RunInTrpcRuntime again within RunInTrpcRuntime
int ret = ::trpc::RunInTrpcRuntime([]() { return 0; });
assert(ret != 0);
}

// Testing scenarios where InitFrameworkRuntime and DestroyFrameworkRuntime appear in pairs
int TestInitAndDestroyFrameworkRuntime() {
int ret = ::trpc::InitFrameworkRuntime();
assert(ret == 0);

::trpc::Latch l(1);
int count = 0;
bool start_fiber = trpc::StartFiberDetached([&] {
count++;
l.count_down();
});

if (start_fiber) {
l.wait();
assert(count == 1);
}

ret = ::trpc::DestroyFrameworkRuntime();
assert(ret == 0);

return 0;
}

// Test calling twice before and after RunInTrpc
int TestRunInTrpcRuntimeDouble() {
bool running_flag{false};
int ret = ::trpc::RunInTrpcRuntime([&running_flag]() {
running_flag = true;

return 0;
});

assert(ret == 0);
assert(running_flag);

return 0;
}

} // namespace trpc::testing

int InitAndRunAllTests(int argc, char** argv) {
Expand All @@ -42,6 +87,12 @@ int InitAndRunAllTests(int argc, char** argv) {
int ret = ::trpc::TrpcConfig::GetInstance()->Init("./trpc/common/testing/fiber_testing.yaml");
assert(ret == 0);

ret = ::trpc::testing::TestInitAndDestroyFrameworkRuntime();
assert(ret == 0);

ret = ::trpc::testing::TestRunInTrpcRuntimeDouble();
assert(ret == 0);

return ::trpc::RunInTrpcRuntime([]() { return ::RUN_ALL_TESTS(); });
}

Expand Down
8 changes: 8 additions & 0 deletions trpc/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ licenses(["notice"])

package(default_visibility = ["//visibility:public"])

cc_library(
name = "runtime_state",
hdrs = ["runtime_state.h"],
)

cc_library(
name = "fiber_runtime",
srcs = ["fiber_runtime.cc"],
hdrs = ["fiber_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/threadmodel:thread_model",
"//trpc/runtime/threadmodel:thread_model_manager",
Expand All @@ -22,6 +28,7 @@ cc_library(
srcs = ["merge_runtime.cc"],
hdrs = ["merge_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/iomodel/reactor",
"//trpc/runtime/threadmodel:thread_model",
Expand All @@ -40,6 +47,7 @@ cc_library(
srcs = ["separate_runtime.cc"],
hdrs = ["separate_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/iomodel/reactor",
"//trpc/runtime/threadmodel:thread_model",
Expand Down
89 changes: 52 additions & 37 deletions trpc/runtime/fiber_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "trpc/runtime/fiber_runtime.h"

#include "trpc/common/config/trpc_config.h"
#include "trpc/runtime/runtime_state.h"
#include "trpc/runtime/threadmodel/fiber/fiber_thread_model.h"
#include "trpc/runtime/threadmodel/thread_model_manager.h"
#include "trpc/util/likely.h"
Expand All @@ -23,6 +24,9 @@

namespace trpc::fiber {

// fiber_threadmodel state manager
RuntimeState fiber_runtime_state{RuntimeState::kUnknown};

// fiber_threadmodel has not owned threadmodel's ownership, it managered by ThreadModelManager
static FiberThreadModel* fiber_threadmodel = nullptr;

Expand Down Expand Up @@ -75,56 +79,67 @@ std::ptrdiff_t NearestSchedulingGroupIndex() {
} // namespace detail

void StartRuntime() {
trpc::fiber::FiberThreadModel::Options options;
options.group_id = ThreadModelManager::GetInstance()->GenGroupId();
options.group_name = "";
options.scheduling_name = "v1";

const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();

// only support one fiber threadmodel instance
if (global_config.threadmodel_config.fiber_model.size() == 1) {
const auto& conf = global_config.threadmodel_config.fiber_model[0];
if (!conf.fiber_scheduling_name.empty()) {
options.scheduling_name = conf.fiber_scheduling_name;
}
options.group_name = conf.instance_name;
options.concurrency_hint = conf.concurrency_hint;
options.scheduling_group_size = conf.scheduling_group_size;
options.numa_aware = conf.numa_aware;

if (!conf.fiber_worker_accessible_cpus.empty() &&
ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) {
// Only when configuring the core binding related settings, strict core binding will take effect.
options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration;
// It can be restarted if it has not been started before or has already been destroyed
if (fiber_runtime_state == RuntimeState::kUnknown || fiber_runtime_state == RuntimeState::kDestroyed) {
trpc::fiber::FiberThreadModel::Options options;
options.group_id = ThreadModelManager::GetInstance()->GenGroupId();
options.group_name = "";
options.scheduling_name = "v1";

const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();

// only support one fiber threadmodel instance
if (global_config.threadmodel_config.fiber_model.size() == 1) {
const auto& conf = global_config.threadmodel_config.fiber_model[0];
if (!conf.fiber_scheduling_name.empty()) {
options.scheduling_name = conf.fiber_scheduling_name;
}
options.group_name = conf.instance_name;
options.concurrency_hint = conf.concurrency_hint;
options.scheduling_group_size = conf.scheduling_group_size;
options.numa_aware = conf.numa_aware;

if (!conf.fiber_worker_accessible_cpus.empty() &&
ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) {
// Only when configuring the core binding related settings, strict core binding will take effect.
options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration;
}
options.work_stealing_ratio = conf.work_stealing_ratio;
options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio;
options.run_queue_size = conf.fiber_run_queue_size;
options.stack_size = conf.fiber_stack_size;
options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap;
options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page;
options.disable_process_name = global_config.thread_disable_process_name;
options.enable_gdb_debug = conf.enable_gdb_debug;
} else {
options.group_name = "fiber_instance";
}
options.work_stealing_ratio = conf.work_stealing_ratio;
options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio;
options.run_queue_size = conf.fiber_run_queue_size;
options.stack_size = conf.fiber_stack_size;
options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap;
options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page;
options.disable_process_name = global_config.thread_disable_process_name;
options.enable_gdb_debug = conf.enable_gdb_debug;
} else {
options.group_name = "fiber_instance";
}

std::shared_ptr<ThreadModel> fiber_model = std::make_shared<FiberThreadModel>(std::move(options));
std::shared_ptr<ThreadModel> fiber_model = std::make_shared<FiberThreadModel>(std::move(options));

fiber_threadmodel = static_cast<FiberThreadModel*>(fiber_model.get());

fiber_threadmodel = static_cast<FiberThreadModel*>(fiber_model.get());
TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true);

TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true);
fiber_threadmodel->Start();

fiber_threadmodel->Start();
fiber_runtime_state = RuntimeState::kStarted;
}
}

void TerminateRuntime() {
if (fiber_runtime_state != RuntimeState::kStarted) {
return;
}

fiber_threadmodel->Terminate();

ThreadModelManager::GetInstance()->Del(fiber_threadmodel->GroupName());

fiber_threadmodel = nullptr;

fiber_runtime_state = RuntimeState::kDestroyed;
}

ThreadModel* GetFiberThreadModel() { return fiber_threadmodel; }
Expand Down
Loading

0 comments on commit 99fc614

Please sign in to comment.