Skip to content

Commit

Permalink
Merge pull request #97 from yqm-307/yqm-307/issue95
Browse files Browse the repository at this point in the history
Yqm 307/issue95
  • Loading branch information
yqm-307 authored Sep 6, 2024
2 parents f217872 + 0771aeb commit bad164e
Show file tree
Hide file tree
Showing 35 changed files with 172 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"-DNEED_BENCHMARK=ON",
"-DNEED_EXAMPLE=ON",
"-DNEED_TEST=ON",
"-DNEED_DEBUG=OFF",
"-DNEED_DEBUG=ON",
"-DPROFILE=OFF",
"-DRELEASE=ON",
"-DDEBUG_INFO=OFF",
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void SleepHook()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();

Normal();

Expand Down Expand Up @@ -160,7 +160,7 @@ void CloseNotify()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();
printf("============================ Example 1 ============================\n");
ReadOnce();
printf("============================ Example 2 ============================\n");
Expand Down
5 changes: 5 additions & 0 deletions bbt/coroutine/detail/CoPoller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ int64_t CoPoller::GetTime()
return m_event_loop->GetTime();
}

std::shared_ptr<bbt::pollevent::EventLoop> CoPoller::GetEventLoop() const
{
return m_event_loop;
}

}
3 changes: 3 additions & 0 deletions bbt/coroutine/detail/CoPoller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class CoPoller
void NotifyCustomEvent(std::shared_ptr<CoPollEvent> event);
/* 获取CoPoller缓存的UTC时间戳 */
int64_t GetTime();

std::shared_ptr<bbt::pollevent::EventLoop>
GetEventLoop() const;
protected:
private:
std::shared_ptr<bbt::pollevent::EventLoop> m_event_loop{nullptr};
Expand Down
22 changes: 19 additions & 3 deletions bbt/coroutine/detail/Define.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,27 @@

#define g_bbt_dbgmgr (bbt::coroutine::detail::DebugMgr::GetInstance())

namespace bbt::coroutine::sync
namespace bbt::coroutine
{


enum SchedulerStartOpt
{
SCHE_START_OPT_SCHE_THREAD = 1, // 线程模式,几乎不阻塞,开启后台调度线程
SCHE_START_OPT_SCHE_LOOP = 2, // 循环模式,阻塞并在当前线程调度
SCHE_START_OPT_SCHE_NO_LOOP = 3, // 非循环模式,需要用户手动LoopOnce来驱动调度
};



namespace sync
{

template<class TItem, int Max> class Chan;
class CoWaiter;



/*
@startuml
[*] --> CHAN_DEFAUTL
Expand Down Expand Up @@ -92,7 +106,7 @@ enum CoRWMutexStatus

}

namespace bbt::coroutine::detail
namespace detail
{

/**
Expand Down Expand Up @@ -227,4 +241,6 @@ typedef std::function<void()> DeferCallback; // defer 执行函数
*/
typedef std::function<void(std::shared_ptr<CoPollEvent>, int /*events*/, int)> CoPollEventCallback; // Poller监听事件完成回调

} // namespace bbt::coroutine::detail
} // namespace bbt::coroutine::detail

} // namespace bbt::coroutine
13 changes: 10 additions & 3 deletions bbt/coroutine/detail/Processer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,18 @@ void Processer::Start(bool background_thread)

void Processer::_Run()
{
/**
* Processer主循环逻辑
*
* 尝试从全局队列或者本地队列取协程对象,如果取不到就挂起P
* 如果取到就在本地线程处理协程执行;
*
* 处理完本地的协程后,尝试取窃取其他P的协程,如果窃取不到
* 就挂起当前线程。
*/
while (m_is_running)
{
m_run_status = ProcesserStatus::PROC_RUNNING;
// 执行本地任务
while (true)
{
std::vector<Coroutine::SPtr> pending_coroutines;
Expand All @@ -122,7 +130,7 @@ void Processer::_Run()
if (coroutine->GetStatus() == CO_RUNNING || coroutine->GetStatus() == CO_FINAL)
continue;

// 执行前设置当前协程
// 执行前设置当前协程缓存
m_running_coroutine = coroutine;
m_running_coroutine_begin.exchange( bbt::clock::gettime_mono<>());
AssertWithInfo(m_running_coroutine != nullptr, "maybe coroutine queue has bug!");
Expand All @@ -133,7 +141,6 @@ void Processer::_Run()
}
}

// 尝试窃取其他Processer任务,失败挂起
if (g_scheduler->TryWorkSteal(shared_from_this()) <= 0)
{
auto begin = bbt::clock::now<bbt::clock::microseconds>();
Expand Down
97 changes: 62 additions & 35 deletions bbt/coroutine/detail/Scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Scheduler::~Scheduler()

void Scheduler::_Init()
{
m_thread = nullptr;
m_sche_thread = nullptr;
m_is_running = true;
m_run_status = SCHE_DEFAULT;
m_regist_coroutine_count = 0;
Expand Down Expand Up @@ -95,75 +95,102 @@ void Scheduler::_FixTimingScan()
m_run_status = ScheudlerStatus::SCHE_SUSPEND;
}

void Scheduler::_OnUpdate()
{
static auto prev_profile_timepoint = bbt::clock::now<>();
bool actived = false;
do {

#ifdef BBT_COROUTINE_PROFILE
if (g_bbt_coroutine_config->m_cfg_profile_printf_ms > 0 &&
bbt::clock::is_expired<bbt::clock::ms>((prev_profile_timepoint + bbt::clock::ms(g_bbt_coroutine_config->m_cfg_profile_printf_ms))))
{
std::string info = "";
g_bbt_profiler->ProfileInfo(info);
bbt::log::DebugPrint(info.c_str());
prev_profile_timepoint = bbt::clock::now<>();
}
#endif

actived = g_bbt_poller->PollOnce();
_FixTimingScan();
g_bbt_stackpoll->OnUpdate();
} while(actived);

}

void Scheduler::_Run()
{
m_begin_timestamp = bbt::clock::now<>();
auto prev_scan_timepoint = bbt::clock::now<>();
auto prev_profile_timepoint = bbt::clock::now<>();

#ifdef BBT_COROUTINE_PROFILE
g_bbt_profiler->OnEvent_StartScheudler();
#endif
while(m_is_running)
{

bool actived = false;
do {
_OnUpdate();

#ifdef BBT_COROUTINE_PROFILE
if (g_bbt_coroutine_config->m_cfg_profile_printf_ms > 0 &&
bbt::clock::is_expired<bbt::clock::milliseconds>((prev_profile_timepoint + bbt::clock::milliseconds(g_bbt_coroutine_config->m_cfg_profile_printf_ms))))
{
std::string info = "";
g_bbt_profiler->ProfileInfo(info);
bbt::log::DebugPrint(info.c_str());
prev_profile_timepoint = bbt::clock::now<>();
}
#endif

actived = g_bbt_poller->PollOnce();
_FixTimingScan();
g_bbt_stackpoll->OnUpdate();
} while(actived);
prev_scan_timepoint = prev_scan_timepoint + bbt::clock::ms(g_bbt_coroutine_config->m_cfg_scan_interval_ms);
std::this_thread::sleep_until(prev_scan_timepoint);
}
}

void Scheduler::Start(bool background_thread)
void Scheduler::Start(SchedulerStartOpt opt)
{
_InitGlobalUniqInstance();
_Init();
bbt::thread::CountDownLatch latch{1};
if (background_thread)
{
Assert(m_thread == nullptr);
m_thread = new std::thread([this, &latch](){
bbt::thread::CountDownLatch wg{1};

switch (opt) {

case SCHE_START_OPT_SCHE_THREAD:
Assert(m_sche_thread == nullptr);
m_sche_thread = new std::thread([this, &wg](){
_CreateProcessers();
latch.Down();
wg.Down();
_Run();
});
wg.Wait();
break;

latch.Wait();
} else {
case SCHE_START_OPT_SCHE_NO_LOOP:
_CreateProcessers();
break;

case SCHE_START_OPT_SCHE_LOOP:
_CreateProcessers();
_Run();
}
break;

default:
break;
};

}

void Scheduler::LoopOnce()
{
// 使用单独的调度线程,就不可以调用LoopOnce来手动驱动了
AssertWithInfo(m_sche_thread == nullptr, "the sche-thread has been started!");

_OnUpdate();
}


void Scheduler::Stop()
{
m_is_running = false;

_DestoryProcessers();

if (m_thread != nullptr) {
if (m_thread->joinable())
m_thread->join();
delete m_thread;
if (m_sche_thread != nullptr) {
if (m_sche_thread->joinable())
m_sche_thread->join();
delete m_sche_thread;
}

m_thread = nullptr;
m_sche_thread = nullptr;
m_global_coroutine_spinlock.Lock();
m_global_coroutine_deque.Clear();
m_global_coroutine_spinlock.UnLock();
Expand Down
13 changes: 10 additions & 3 deletions bbt/coroutine/detail/Scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
namespace bbt::coroutine::detail
{

/**
* @brief 调度器
*
*/
class Scheduler
{
public:
Expand All @@ -16,8 +20,9 @@ class Scheduler
static UPtr& GetInstance();

/* 显示指定运行线程 */
void Start(bool background_thread = false);
void Stop();
void Start(SchedulerStartOpt opt = SCHE_START_OPT_SCHE_THREAD);
void Stop();
void LoopOnce();

CoroutineId RegistCoroutineTask(const CoroutineCallback& handle);
/* 协程被激活,重新加入全局队列 */
Expand Down Expand Up @@ -47,10 +52,12 @@ class Scheduler
bool _LoadBlance2Proc(Coroutine::SPtr co);
/* 初始化全局实例 */
void _InitGlobalUniqInstance();

void _OnUpdate();
private:
/* Scheduler */
bbt::clock::Timestamp<> m_begin_timestamp; // 调度器开启时间
std::thread* m_thread{nullptr};
std::thread* m_sche_thread{nullptr};
std::vector<std::thread*> m_proc_threads;

/* Processer 管理 */
Expand Down
2 changes: 1 addition & 1 deletion benchmark_test/benchmark_coroutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ int main()
bbt::thread::CountDownLatch l{nsum_co};
auto begin = bbt::clock::gettime();

g_scheduler->Start(true);
g_scheduler->Start();

bbtco [&](){
for (int i = 0; i < nsum_co; ++i)
Expand Down
2 changes: 1 addition & 1 deletion benchmark_test/fatigue_chan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ int main()
std::atomic_uint64_t read_count{0};
std::atomic_uint64_t write_count{0};
std::atomic_uint64_t write_failed{0};
g_scheduler->Start(true);
g_scheduler->Start();

/* 开启100个单读多写协程 */

Expand Down
2 changes: 1 addition & 1 deletion benchmark_test/fatigue_comutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void fatigue_1()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();

fatigue_1();

Expand Down
2 changes: 1 addition & 1 deletion benchmark_test/fatigue_coroutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
int main()
{
std::atomic_uint64_t ncount = 0;
g_scheduler->Start(true);
g_scheduler->Start();

while (true) {
for (int i = 0; i < 100000; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion benchmark_test/mem_check_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void ChanRW()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();
// 协程执行测试
CoTask();
// Chan执行测试
Expand Down
2 changes: 1 addition & 1 deletion debug/Debug_chan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void DebugWriteBlock()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();
ReadOnce();
ReadMulti();
DebugWriteBlock();
Expand Down
2 changes: 1 addition & 1 deletion debug/Debug_co_rwmutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void rwlock_test()

int main()
{
g_scheduler->Start(true);
g_scheduler->Start();

rwlock_test();

Expand Down
Loading

0 comments on commit bad164e

Please sign in to comment.