From 0bae5533205999ad0ab71d52b112e4dc9fe382c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?kiviyu=28=E4=BD=99=E5=86=9B=29?= Date: Mon, 29 Jan 2024 17:08:08 +0800 Subject: [PATCH] Feat:Runtime state management to avoid duplicate calls --- trpc/common/BUILD | 2 + trpc/common/runtime_manager.cc | 58 ++++++++++----- trpc/common/runtime_manager.h | 2 +- trpc/common/runtime_manager_fiber_test.cc | 51 +++++++++++++ trpc/runtime/BUILD | 8 ++ trpc/runtime/fiber_runtime.cc | 89 +++++++++++++---------- trpc/runtime/merge_runtime.cc | 21 +++++- trpc/runtime/runtime_state.h | 25 +++++++ trpc/runtime/separate_runtime.cc | 39 ++++++++-- 9 files changed, 230 insertions(+), 65 deletions(-) create mode 100644 trpc/runtime/runtime_state.h diff --git a/trpc/common/BUILD b/trpc/common/BUILD index cbca922f..ef8e9d6a 100644 --- a/trpc/common/BUILD +++ b/trpc/common/BUILD @@ -75,6 +75,7 @@ cc_library( "//trpc/naming:trpc_naming_registry", "//trpc/runtime", "//trpc/runtime:fiber_runtime", + "//trpc/runtime:runtime_state", "//trpc/runtime:separate_runtime", "//trpc/runtime/common:periphery_task_scheduler", "//trpc/serialization:trpc_serialization", @@ -178,6 +179,7 @@ cc_test( deps = [ ":runtime_manager", "//trpc/common/config:trpc_config", + "//trpc/util:latch", "@com_google_googletest//:gtest", "@com_google_googletest//:gtest_main", ], diff --git a/trpc/common/runtime_manager.cc b/trpc/common/runtime_manager.cc index 97294c0d..0c18661d 100644 --- a/trpc/common/runtime_manager.cc +++ b/trpc/common/runtime_manager.cc @@ -24,6 +24,7 @@ #include "trpc/runtime/common/periphery_task_scheduler.h" #include "trpc/runtime/fiber_runtime.h" #include "trpc/runtime/runtime.h" +#include "trpc/runtime/runtime_state.h" #include "trpc/runtime/separate_runtime.h" #include "trpc/serialization/trpc_serialization.h" #include "trpc/telemetry/telemetry_factory.h" @@ -40,6 +41,9 @@ namespace trpc { +// FrameworkRuntime state manager +RuntimeState framework_runtime_state{RuntimeState::kUnknown}; + int RunInTrpcRuntime(std::function&& func) { const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig(); const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config; @@ -121,37 +125,49 @@ int RunInThreadRuntime(std::function&& func) { } int InitFrameworkRuntime() { - const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig(); - const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config; - if (threadmodel_config.use_fiber_flag) { - runtime::SetRuntimeType(runtime::kFiberRuntime); - } else { - TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config."); - return -1; - } + // It can be restarted if it has not been started before or has already been destroyed + if (framework_runtime_state == RuntimeState::kUnknown || framework_runtime_state == RuntimeState::kDestroyed) { + const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig(); + const ThreadModelConfig& threadmodel_config = global_config.threadmodel_config; + if (threadmodel_config.use_fiber_flag) { + runtime::SetRuntimeType(runtime::kFiberRuntime); + } else { + TRPC_FMT_ERROR("not running in fiber runtime, please check the framework config."); + return -1; + } - util::IgnorePipe(); + util::IgnorePipe(); - // set object pool option - const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config; - memory_pool::SetMemBlockSize(buffer_pool_config.block_size); - memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold); + // set object pool option + const BufferPoolConfig& buffer_pool_config = global_config.buffer_pool_config; + memory_pool::SetMemBlockSize(buffer_pool_config.block_size); + memory_pool::SetMemPoolThreshold(buffer_pool_config.mem_pool_threshold); - internal::TimeKeeper::Instance()->Start(); + internal::TimeKeeper::Instance()->Start(); - if (IsInFiberRuntime()) { - fiber::StartRuntime(); - runtime::InitFiberReactorConfig(); + if (IsInFiberRuntime()) { + fiber::StartRuntime(); + // need to StartAdminRuntime + separate::StartAdminRuntime(); + runtime::InitFiberReactorConfig(); + } + + framework_runtime_state = RuntimeState::kStarted; + return 0; } - return 0; + return -1; } bool IsInFiberRuntime() { return runtime::IsInFiberRuntime(); } -void DestroyFrameworkRuntime() { +int DestroyFrameworkRuntime() { + if (framework_runtime_state != RuntimeState::kStarted) { + return -1; + } + if (IsInFiberRuntime()) { separate::TerminateAdminRuntime(); @@ -162,6 +178,10 @@ void DestroyFrameworkRuntime() { internal::TimeKeeper::Instance()->Join(); TrpcPlugin::GetInstance()->DestroyResource(); + + framework_runtime_state = RuntimeState::kDestroyed; + + return 0; } } // namespace trpc diff --git a/trpc/common/runtime_manager.h b/trpc/common/runtime_manager.h index e06f8adb..b6cc1651 100644 --- a/trpc/common/runtime_manager.h +++ b/trpc/common/runtime_manager.h @@ -41,6 +41,6 @@ int InitFrameworkRuntime(); /// @brief Destroy the runtime environment of the framework. /// @note for compatible, please not use it. /// @private For internal use purpose only. -void DestroyFrameworkRuntime(); +int DestroyFrameworkRuntime(); } // namespace trpc diff --git a/trpc/common/runtime_manager_fiber_test.cc b/trpc/common/runtime_manager_fiber_test.cc index 093443a4..64e3fd79 100644 --- a/trpc/common/runtime_manager_fiber_test.cc +++ b/trpc/common/runtime_manager_fiber_test.cc @@ -17,6 +17,7 @@ #include "trpc/common/runtime_manager.h" #include "trpc/coroutine/fiber.h" #include "trpc/coroutine/fiber_latch.h" +#include "trpc/util/latch.h" namespace trpc::testing { @@ -34,6 +35,50 @@ TEST(RuntimeManager, Run) { ASSERT_TRUE(running_flag); } +TEST(RuntimeManager, RunInTrpcRuntimeInTrpcRuntime) { + // Test calling RunInTrpcRuntime again within RunInTrpcRuntime + int ret = ::trpc::RunInTrpcRuntime([]() { return 0; }); + assert(ret != 0); +} + +// Testing scenarios where InitFrameworkRuntime and DestroyFrameworkRuntime appear in pairs +int TestInitAndDestroyFrameworkRuntime() { + int ret = ::trpc::InitFrameworkRuntime(); + assert(ret == 0); + + ::trpc::Latch l(1); + int count = 0; + bool start_fiber = trpc::StartFiberDetached([&] { + count++; + l.count_down(); + }); + + if (start_fiber) { + l.wait(); + assert(count == 1); + } + + ret = ::trpc::DestroyFrameworkRuntime(); + assert(ret == 0); + + return 0; +} + +// Test calling twice before and after RunInTrpc +int TestRunInTrpcRuntimeDouble() { + bool running_flag{false}; + int ret = ::trpc::RunInTrpcRuntime([&running_flag]() { + running_flag = true; + + return 0; + }); + + assert(ret == 0); + assert(running_flag); + + return 0; +} + } // namespace trpc::testing int InitAndRunAllTests(int argc, char** argv) { @@ -42,6 +87,12 @@ int InitAndRunAllTests(int argc, char** argv) { int ret = ::trpc::TrpcConfig::GetInstance()->Init("./trpc/common/testing/fiber_testing.yaml"); assert(ret == 0); + ret = ::trpc::testing::TestInitAndDestroyFrameworkRuntime(); + assert(ret == 0); + + ret = ::trpc::testing::TestRunInTrpcRuntimeDouble(); + assert(ret == 0); + return ::trpc::RunInTrpcRuntime([]() { return ::RUN_ALL_TESTS(); }); } diff --git a/trpc/runtime/BUILD b/trpc/runtime/BUILD index 71a790c6..2c6ebdcc 100644 --- a/trpc/runtime/BUILD +++ b/trpc/runtime/BUILD @@ -2,11 +2,17 @@ licenses(["notice"]) package(default_visibility = ["//visibility:public"]) +cc_library( + name = "runtime_state", + hdrs = ["runtime_state.h"], +) + cc_library( name = "fiber_runtime", srcs = ["fiber_runtime.cc"], hdrs = ["fiber_runtime.h"], deps = [ + ":runtime_state", "//trpc/common/config:trpc_config", "//trpc/runtime/threadmodel:thread_model", "//trpc/runtime/threadmodel:thread_model_manager", @@ -22,6 +28,7 @@ cc_library( srcs = ["merge_runtime.cc"], hdrs = ["merge_runtime.h"], deps = [ + ":runtime_state", "//trpc/common/config:trpc_config", "//trpc/runtime/iomodel/reactor", "//trpc/runtime/threadmodel:thread_model", @@ -40,6 +47,7 @@ cc_library( srcs = ["separate_runtime.cc"], hdrs = ["separate_runtime.h"], deps = [ + ":runtime_state", "//trpc/common/config:trpc_config", "//trpc/runtime/iomodel/reactor", "//trpc/runtime/threadmodel:thread_model", diff --git a/trpc/runtime/fiber_runtime.cc b/trpc/runtime/fiber_runtime.cc index 00deccbd..f020fd40 100644 --- a/trpc/runtime/fiber_runtime.cc +++ b/trpc/runtime/fiber_runtime.cc @@ -14,6 +14,7 @@ #include "trpc/runtime/fiber_runtime.h" #include "trpc/common/config/trpc_config.h" +#include "trpc/runtime/runtime_state.h" #include "trpc/runtime/threadmodel/fiber/fiber_thread_model.h" #include "trpc/runtime/threadmodel/thread_model_manager.h" #include "trpc/util/likely.h" @@ -23,6 +24,9 @@ namespace trpc::fiber { +// fiber_threadmodel state manager +RuntimeState fiber_runtime_state{RuntimeState::kUnknown}; + // fiber_threadmodel has not owned threadmodel's ownership, it managered by ThreadModelManager static FiberThreadModel* fiber_threadmodel = nullptr; @@ -75,56 +79,67 @@ std::ptrdiff_t NearestSchedulingGroupIndex() { } // namespace detail void StartRuntime() { - trpc::fiber::FiberThreadModel::Options options; - options.group_id = ThreadModelManager::GetInstance()->GenGroupId(); - options.group_name = ""; - options.scheduling_name = "v1"; - - const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig(); - - // only support one fiber threadmodel instance - if (global_config.threadmodel_config.fiber_model.size() == 1) { - const auto& conf = global_config.threadmodel_config.fiber_model[0]; - if (!conf.fiber_scheduling_name.empty()) { - options.scheduling_name = conf.fiber_scheduling_name; - } - options.group_name = conf.instance_name; - options.concurrency_hint = conf.concurrency_hint; - options.scheduling_group_size = conf.scheduling_group_size; - options.numa_aware = conf.numa_aware; - - if (!conf.fiber_worker_accessible_cpus.empty() && - ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) { - // Only when configuring the core binding related settings, strict core binding will take effect. - options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration; + // It can be restarted if it has not been started before or has already been destroyed + if (fiber_runtime_state == RuntimeState::kUnknown || fiber_runtime_state == RuntimeState::kDestroyed) { + trpc::fiber::FiberThreadModel::Options options; + options.group_id = ThreadModelManager::GetInstance()->GenGroupId(); + options.group_name = ""; + options.scheduling_name = "v1"; + + const GlobalConfig& global_config = TrpcConfig::GetInstance()->GetGlobalConfig(); + + // only support one fiber threadmodel instance + if (global_config.threadmodel_config.fiber_model.size() == 1) { + const auto& conf = global_config.threadmodel_config.fiber_model[0]; + if (!conf.fiber_scheduling_name.empty()) { + options.scheduling_name = conf.fiber_scheduling_name; + } + options.group_name = conf.instance_name; + options.concurrency_hint = conf.concurrency_hint; + options.scheduling_group_size = conf.scheduling_group_size; + options.numa_aware = conf.numa_aware; + + if (!conf.fiber_worker_accessible_cpus.empty() && + ParseBindCoreConfig(conf.fiber_worker_accessible_cpus, options.worker_accessible_cpus)) { + // Only when configuring the core binding related settings, strict core binding will take effect. + options.worker_disallow_cpu_migration = conf.fiber_worker_disallow_cpu_migration; + } + options.work_stealing_ratio = conf.work_stealing_ratio; + options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio; + options.run_queue_size = conf.fiber_run_queue_size; + options.stack_size = conf.fiber_stack_size; + options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap; + options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page; + options.disable_process_name = global_config.thread_disable_process_name; + options.enable_gdb_debug = conf.enable_gdb_debug; + } else { + options.group_name = "fiber_instance"; } - options.work_stealing_ratio = conf.work_stealing_ratio; - options.cross_numa_work_stealing_ratio = conf.cross_numa_work_stealing_ratio; - options.run_queue_size = conf.fiber_run_queue_size; - options.stack_size = conf.fiber_stack_size; - options.pool_num_by_mmap = conf.fiber_pool_num_by_mmap; - options.stack_enable_guard_page = conf.fiber_stack_enable_guard_page; - options.disable_process_name = global_config.thread_disable_process_name; - options.enable_gdb_debug = conf.enable_gdb_debug; - } else { - options.group_name = "fiber_instance"; - } - std::shared_ptr fiber_model = std::make_shared(std::move(options)); + std::shared_ptr fiber_model = std::make_shared(std::move(options)); + + fiber_threadmodel = static_cast(fiber_model.get()); - fiber_threadmodel = static_cast(fiber_model.get()); + TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true); - TRPC_ASSERT(ThreadModelManager::GetInstance()->Register(fiber_model) == true); + fiber_threadmodel->Start(); - fiber_threadmodel->Start(); + fiber_runtime_state = RuntimeState::kStarted; + } } void TerminateRuntime() { + if (fiber_runtime_state != RuntimeState::kStarted) { + return; + } + fiber_threadmodel->Terminate(); ThreadModelManager::GetInstance()->Del(fiber_threadmodel->GroupName()); fiber_threadmodel = nullptr; + + fiber_runtime_state = RuntimeState::kDestroyed; } ThreadModel* GetFiberThreadModel() { return fiber_threadmodel; } diff --git a/trpc/runtime/merge_runtime.cc b/trpc/runtime/merge_runtime.cc index 7196e82e..e59bb1d6 100644 --- a/trpc/runtime/merge_runtime.cc +++ b/trpc/runtime/merge_runtime.cc @@ -15,6 +15,7 @@ #include "trpc/common/config/global_conf.h" #include "trpc/common/config/trpc_config.h" +#include "trpc/runtime/runtime_state.h" #include "trpc/runtime/threadmodel/merge/merge_thread_model.h" #include "trpc/runtime/threadmodel/thread_model_manager.h" #include "trpc/util/check.h" @@ -24,6 +25,9 @@ namespace trpc::merge { +// business_threadmodels state manager +RuntimeState business_runtime_state{RuntimeState::kUnknown}; + // business_threadmodels has not owned threadmodel's ownership, all managered by ThreadModelManager static std::vector business_threadmodels; @@ -78,14 +82,23 @@ void Init() { } // namespace void StartRuntime() { - Init(); + // It can be restarted if it has not been started before or has already been destroyed + if (business_runtime_state == RuntimeState::kUnknown || business_runtime_state == RuntimeState::kDestroyed) { + Init(); - for (auto&& it : business_threadmodels) { - it->Start(); + for (auto&& it : business_threadmodels) { + it->Start(); + } + + business_runtime_state = RuntimeState::kStarted; } } void TerminateRuntime() { + if (business_runtime_state != RuntimeState::kStarted) { + return; + } + for (auto&& it : business_threadmodels) { it->Terminate(); @@ -93,6 +106,8 @@ void TerminateRuntime() { } business_threadmodels.clear(); + + business_runtime_state = RuntimeState::kDestroyed; } ThreadModel* RandomGetMergeThreadModel() { diff --git a/trpc/runtime/runtime_state.h b/trpc/runtime/runtime_state.h new file mode 100644 index 00000000..6bb9c4c6 --- /dev/null +++ b/trpc/runtime/runtime_state.h @@ -0,0 +1,25 @@ +// +// +// Tencent is pleased to support the open source community by making tRPC available. +// +// Copyright (C) 2024 THL A29 Limited, a Tencent company. +// All rights reserved. +// +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. +// +// + +#pragma once + +namespace trpc { + +enum class RuntimeState { + kUnknown = 0, // Not yet initialized, cannot be used. Next phase is kStarted + kStarted = 1, // Successfully started, can be used normally. Next phase is kStopped + kStopped = 2, // Successfully stopped. Next phase is kDestroyed + kDestroyed = 3, // Successfully destroyed, entire lifecycle ended +}; + +} \ No newline at end of file diff --git a/trpc/runtime/separate_runtime.cc b/trpc/runtime/separate_runtime.cc index 02b4134a..ddd78b96 100644 --- a/trpc/runtime/separate_runtime.cc +++ b/trpc/runtime/separate_runtime.cc @@ -15,6 +15,7 @@ #include "trpc/common/config/global_conf.h" #include "trpc/common/config/trpc_config.h" +#include "trpc/runtime/runtime_state.h" #include "trpc/runtime/threadmodel/separate/non_steal/non_steal_scheduling.h" #include "trpc/runtime/threadmodel/separate/separate_thread_model.h" #include "trpc/runtime/threadmodel/separate/steal/steal_scheduling.h" @@ -26,6 +27,12 @@ namespace trpc::separate { +// admin runtime state manager +RuntimeState admin_runtime_state{RuntimeState::kUnknown}; + +// business_threadmodels state manager +RuntimeState business_runtime_state{RuntimeState::kUnknown}; + // admin_threadmodel and business_threadmodels has not owned threadmodel's ownership // all managered by ThreadModelManager static ThreadModel* admin_threadmodel{nullptr}; @@ -173,28 +180,48 @@ void InitAdminThreadModel() { } // namespace void StartAdminRuntime() { - InitAdminThreadModel(); + // It can be restarted if it has not been started before or has already been destroyed + if (admin_runtime_state == RuntimeState::kUnknown || admin_runtime_state == RuntimeState::kDestroyed) { + InitAdminThreadModel(); - admin_threadmodel->Start(); + admin_threadmodel->Start(); + + admin_runtime_state = RuntimeState::kStarted; + } } void TerminateAdminRuntime() { + if (admin_runtime_state != RuntimeState::kStarted) { + return; + } + admin_threadmodel->Terminate(); ThreadModelManager::GetInstance()->Del(admin_threadmodel->GroupName()); admin_threadmodel = nullptr; + + admin_runtime_state = RuntimeState::kDestroyed; } void StartRuntime() { - InitBusinessThreadModel(); + // It can be restarted if it has not been started before or has already been destroyed + if (business_runtime_state == RuntimeState::kUnknown || business_runtime_state == RuntimeState::kDestroyed) { + InitBusinessThreadModel(); - for (auto&& it : business_threadmodels) { - it->Start(); + for (auto&& it : business_threadmodels) { + it->Start(); + } + + business_runtime_state = RuntimeState::kStarted; } } void TerminateRuntime() { + if (business_runtime_state != RuntimeState::kStarted) { + return; + } + for (auto&& it : business_threadmodels) { it->Terminate(); @@ -202,6 +229,8 @@ void TerminateRuntime() { } business_threadmodels.clear(); + + business_runtime_state = RuntimeState::kDestroyed; } ThreadModel* RandomGetSeparateThreadModel() {