Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat:Runtime state management to avoid duplicate calls #112

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions trpc/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cc_library(
"//trpc/naming:trpc_naming_registry",
"//trpc/runtime",
"//trpc/runtime:fiber_runtime",
"//trpc/runtime:runtime_state",
"//trpc/runtime:separate_runtime",
"//trpc/runtime/common:periphery_task_scheduler",
"//trpc/serialization:trpc_serialization",
Expand Down Expand Up @@ -178,6 +179,7 @@ cc_test(
deps = [
":runtime_manager",
"//trpc/common/config:trpc_config",
"//trpc/util:latch",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
58 changes: 39 additions & 19 deletions trpc/common/runtime_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "trpc/runtime/common/periphery_task_scheduler.h"
#include "trpc/runtime/fiber_runtime.h"
#include "trpc/runtime/runtime.h"
#include "trpc/runtime/runtime_state.h"
#include "trpc/runtime/separate_runtime.h"
#include "trpc/serialization/trpc_serialization.h"
#include "trpc/telemetry/telemetry_factory.h"
Expand All @@ -40,6 +41,9 @@

namespace trpc {

// FrameworkRuntime state manager
RuntimeState framework_runtime_state{RuntimeState::kUnknown};

int RunInTrpcRuntime(std::function<int()>&& func) {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
Expand Down Expand Up @@ -121,37 +125,49 @@ int RunInThreadRuntime(std::function<int()>&& func) {
}

int InitFrameworkRuntime() {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
if (threadmodel_config.use_fiber_flag) {
runtime::SetRuntimeType(runtime::kFiberRuntime);
} else {
TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config.");
return -1;
}
// It can be restarted if it has not been started before or has already been destroyed
if (framework_runtime_state == RuntimeState::kUnknown || framework_runtime_state == RuntimeState::kDestroyed) {
const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();
const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config;
if (threadmodel_config.use_fiber_flag) {
runtime::SetRuntimeType(runtime::kFiberRuntime);
} else {
TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config.");
return -1;
}

util::IgnorePipe();
util::IgnorePipe();

// set object pool option
const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config;
memory_pool::SetMemBlockSize(buffer_pool_config.block_size);
memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold);
// set object pool option
const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config;
memory_pool::SetMemBlockSize(buffer_pool_config.block_size);
memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold);

internal::TimeKeeper::Instance()->Start();
internal::TimeKeeper::Instance()->Start();

if (IsInFiberRuntime()) {
fiber::StartRuntime();
runtime::InitFiberReactorConfig();
if (IsInFiberRuntime()) {
fiber::StartRuntime();
// need to StartAdminRuntime
separate::StartAdminRuntime();
runtime::InitFiberReactorConfig();
}

framework_runtime_state = RuntimeState::kStarted;
return 0;
}

return 0;
return -1;
}

bool IsInFiberRuntime() {
return runtime::IsInFiberRuntime();
}

void DestroyFrameworkRuntime() {
int DestroyFrameworkRuntime() {
if (framework_runtime_state != RuntimeState::kStarted) {
return -1;
}

if (IsInFiberRuntime()) {
separate::TerminateAdminRuntime();

Expand All @@ -162,6 +178,10 @@ void DestroyFrameworkRuntime() {
internal::TimeKeeper::Instance()->Join();

TrpcPlugin::GetInstance()->DestroyResource();

framework_runtime_state = RuntimeState::kDestroyed;

return 0;
}

} // namespace trpc
2 changes: 1 addition & 1 deletion trpc/common/runtime_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ int InitFrameworkRuntime();
/// @brief Destroy the runtime environment of the framework.
/// @note for compatible, please not use it.
/// @private For internal use purpose only.
void DestroyFrameworkRuntime();
int DestroyFrameworkRuntime();

} // namespace trpc
51 changes: 51 additions & 0 deletions trpc/common/runtime_manager_fiber_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "trpc/common/runtime_manager.h"
#include "trpc/coroutine/fiber.h"
#include "trpc/coroutine/fiber_latch.h"
#include "trpc/util/latch.h"

namespace trpc::testing {

Expand All @@ -34,6 +35,50 @@ TEST(RuntimeManager, Run) {
ASSERT_TRUE(running_flag);
}

TEST(RuntimeManager, RunInTrpcRuntimeInTrpcRuntime) {
// Test calling RunInTrpcRuntime again within RunInTrpcRuntime
int ret = ::trpc::RunInTrpcRuntime([]() { return 0; });
assert(ret != 0);
}

// Testing scenarios where InitFrameworkRuntime and DestroyFrameworkRuntime appear in pairs
int TestInitAndDestroyFrameworkRuntime() {
int ret = ::trpc::InitFrameworkRuntime();
assert(ret == 0);

::trpc::Latch l(1);
int count = 0;
bool start_fiber = trpc::StartFiberDetached([&] {
count++;
l.count_down();
});

if (start_fiber) {
l.wait();
assert(count == 1);
}

ret = ::trpc::DestroyFrameworkRuntime();
assert(ret == 0);

return 0;
}

// Test calling twice before and after RunInTrpc
int TestRunInTrpcRuntimeDouble() {
bool running_flag{false};
int ret = ::trpc::RunInTrpcRuntime([&running_flag]() {
running_flag = true;

return 0;
});

assert(ret == 0);
assert(running_flag);

return 0;
}

} // namespace trpc::testing

int InitAndRunAllTests(int argc, char** argv) {
Expand All @@ -42,6 +87,12 @@ int InitAndRunAllTests(int argc, char** argv) {
int ret = ::trpc::TrpcConfig::GetInstance()->Init("./trpc/common/testing/fiber_testing.yaml");
assert(ret == 0);

ret = ::trpc::testing::TestInitAndDestroyFrameworkRuntime();
assert(ret == 0);

ret = ::trpc::testing::TestRunInTrpcRuntimeDouble();
assert(ret == 0);

return ::trpc::RunInTrpcRuntime([]() { return ::RUN_ALL_TESTS(); });
}

Expand Down
8 changes: 8 additions & 0 deletions trpc/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ licenses(["notice"])

package(default_visibility = ["//visibility:public"])

cc_library(
name = "runtime_state",
hdrs = ["runtime_state.h"],
)

cc_library(
name = "fiber_runtime",
srcs = ["fiber_runtime.cc"],
hdrs = ["fiber_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/threadmodel:thread_model",
"//trpc/runtime/threadmodel:thread_model_manager",
Expand All @@ -22,6 +28,7 @@ cc_library(
srcs = ["merge_runtime.cc"],
hdrs = ["merge_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/iomodel/reactor",
"//trpc/runtime/threadmodel:thread_model",
Expand All @@ -40,6 +47,7 @@ cc_library(
srcs = ["separate_runtime.cc"],
hdrs = ["separate_runtime.h"],
deps = [
":runtime_state",
"//trpc/common/config:trpc_config",
"//trpc/runtime/iomodel/reactor",
"//trpc/runtime/threadmodel:thread_model",
Expand Down
89 changes: 52 additions & 37 deletions trpc/runtime/fiber_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "trpc/runtime/fiber_runtime.h"

#include "trpc/common/config/trpc_config.h"
#include "trpc/runtime/runtime_state.h"
#include "trpc/runtime/threadmodel/fiber/fiber_thread_model.h"
#include "trpc/runtime/threadmodel/thread_model_manager.h"
#include "trpc/util/likely.h"
Expand All @@ -23,6 +24,9 @@

namespace trpc::fiber {

// fiber_threadmodel state manager
RuntimeState fiber_runtime_state{RuntimeState::kUnknown};

// fiber_threadmodel has not owned threadmodel's ownership, it managered by ThreadModelManager
static FiberThreadModel* fiber_threadmodel = nullptr;

Expand Down Expand Up @@ -75,56 +79,67 @@ std::ptrdiff_t NearestSchedulingGroupIndex() {
} // namespace detail

void StartRuntime() {
trpc::fiber::FiberThreadModel::Options options;
options.group_id = ThreadModelManager::GetInstance()->GenGroupId();
options.group_name = "";
options.scheduling_name = "v1";

const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();

// only support one fiber threadmodel instance
if (global_config.threadmodel_config.fiber_model.size() == 1) {
const auto& conf = global_config.threadmodel_config.fiber_model[0];
if (!conf.fiber_scheduling_name.empty()) {
options.scheduling_name = conf.fiber_scheduling_name;
}
options.group_name = conf.instance_name;
options.concurrency_hint = conf.concurrency_hint;
options.scheduling_group_size = conf.scheduling_group_size;
options.numa_aware = conf.numa_aware;

if (!conf.fiber_worker_accessible_cpus.empty() &&
ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) {
// Only when configuring the core binding related settings, strict core binding will take effect.
options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration;
// It can be restarted if it has not been started before or has already been destroyed
if (fiber_runtime_state == RuntimeState::kUnknown || fiber_runtime_state == RuntimeState::kDestroyed) {
trpc::fiber::FiberThreadModel::Options options;
options.group_id = ThreadModelManager::GetInstance()->GenGroupId();
options.group_name = "";
options.scheduling_name = "v1";

const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig();

// only support one fiber threadmodel instance
if (global_config.threadmodel_config.fiber_model.size() == 1) {
const auto& conf = global_config.threadmodel_config.fiber_model[0];
if (!conf.fiber_scheduling_name.empty()) {
options.scheduling_name = conf.fiber_scheduling_name;
}
options.group_name = conf.instance_name;
options.concurrency_hint = conf.concurrency_hint;
options.scheduling_group_size = conf.scheduling_group_size;
options.numa_aware = conf.numa_aware;

if (!conf.fiber_worker_accessible_cpus.empty() &&
ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) {
// Only when configuring the core binding related settings, strict core binding will take effect.
options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration;
}
options.work_stealing_ratio = conf.work_stealing_ratio;
options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio;
options.run_queue_size = conf.fiber_run_queue_size;
options.stack_size = conf.fiber_stack_size;
options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap;
options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page;
options.disable_process_name = global_config.thread_disable_process_name;
options.enable_gdb_debug = conf.enable_gdb_debug;
} else {
options.group_name = "fiber_instance";
}
options.work_stealing_ratio = conf.work_stealing_ratio;
options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio;
options.run_queue_size = conf.fiber_run_queue_size;
options.stack_size = conf.fiber_stack_size;
options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap;
options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page;
options.disable_process_name = global_config.thread_disable_process_name;
options.enable_gdb_debug = conf.enable_gdb_debug;
} else {
options.group_name = "fiber_instance";
}

std::shared_ptr<ThreadModel> fiber_model = std::make_shared<FiberThreadModel>(std::move(options));
std::shared_ptr<ThreadModel> fiber_model = std::make_shared<FiberThreadModel>(std::move(options));

fiber_threadmodel = static_cast<FiberThreadModel*>(fiber_model.get());

fiber_threadmodel = static_cast<FiberThreadModel*>(fiber_model.get());
TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true);

TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true);
fiber_threadmodel->Start();

fiber_threadmodel->Start();
fiber_runtime_state = RuntimeState::kStarted;
}
}

void TerminateRuntime() {
if (fiber_runtime_state != RuntimeState::kStarted) {
return;
}

fiber_threadmodel->Terminate();

ThreadModelManager::GetInstance()->Del(fiber_threadmodel->GroupName());

fiber_threadmodel = nullptr;

fiber_runtime_state = RuntimeState::kDestroyed;
}

ThreadModel* GetFiberThreadModel() { return fiber_threadmodel; }
Expand Down
Loading
Loading