Skip to content

Commit

Permalink
增加初始化监听,优化listener代码
Browse files Browse the repository at this point in the history
  • Loading branch information
mlkt committed Apr 29, 2019
1 parent b9f8137 commit 109958f
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 65 deletions.
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ else()
message (" enable_debugger: no")
endif()

option(ENABLE_LISTENER "enable listener" ${ENABLE_DEBUGGER})
if (ENABLE_LISTENER)
set(ENABLE_LISTENER 1)
message (" enable_listener: yes")
else()
set(ENABLE_LISTENER 0)
message (" enable_listener: no")
endif()

option(DISABLE_HOOK "disable hook" OFF)
if (DISABLE_HOOK)
set(ENABLE_HOOK 0)
Expand Down
2 changes: 2 additions & 0 deletions libgo/common/cmake_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

#define ENABLE_DEBUGGER ${ENABLE_DEBUGGER}

#define ENABLE_LISTENER ${ENABLE_LISTENER}

#define ENABLE_HOOK ${ENABLE_HOOK}
45 changes: 37 additions & 8 deletions libgo/debug/listener.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <exception>
#include "../task/task.h"

namespace co
{
Expand All @@ -13,6 +15,20 @@ class Listener
*/
class TaskListener {
public:
/**
* 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截
* (注意此时并未运行在协程中)
*
* @prarm task_id 协程ID
* @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装
* @param opt 协程创建的参数,可以赋值修改此参数值
*
* @return 返回true,正常创建该任务;返回false,放弃此任务
*/
virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept {
return true;
}

/**
* 协程被创建时被调用
* (注意此时并未运行在协程中)
Expand Down Expand Up @@ -88,17 +104,17 @@ class Listener
// s: Scheduler,表示该方法运行在调度器上下文中
// c: Coroutine,表示该方法运行在协程上下文中
//
// -->[c]onCompleted->
// | |
// [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// -->[c]onCompleted->
// | |
// [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
};

public:
#if ENABLE_DEBUGGER
#if ENABLE_LISTENER
ALWAYS_INLINE static TaskListener*& GetTaskListener() {
static TaskListener* task_listener = nullptr;
return task_listener;
Expand All @@ -108,6 +124,19 @@ class Listener
GetTaskListener() = listener;
}
#endif

#if ENABLE_LISTENER
#define SAFE_CALL_LISTENER(listener, method, ...) \
do { \
auto* __listener = (listener); \
if (__listener) { \
__listener->method(__VA_ARGS__); \
} \
} while(0)

#else
#define SAFE_CALL_LISTENER(...) do {} while(0)
#endif
};

} // namespace co
3 changes: 1 addition & 2 deletions libgo/scheduler/processer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ void Processer::Process()

#if ENABLE_DEBUGGER
DebugPrint(dbg_switch, "enter task(%s)", runningTask_->DebugInfo());
if (Listener::GetTaskListener())
Listener::GetTaskListener()->onSwapIn(runningTask_->id_);
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapIn, runningTask_->id_);

++switchCount_;

Expand Down
7 changes: 2 additions & 5 deletions libgo/scheduler/processer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
#include "../common/clock.h"
#include "../task/task.h"
#include "../common/ts_queue.h"

#if ENABLE_DEBUGGER
#include "../debug/listener.h"
#endif
#include <condition_variable>
#include <mutex>
#include <atomic>
Expand Down Expand Up @@ -179,10 +176,10 @@ ALWAYS_INLINE void Processer::CoYield()

#if ENABLE_DEBUGGER
DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));
if (Listener::GetTaskListener())
Listener::GetTaskListener()->onSwapOut(tk->id_);
#endif

SAFE_CALL_LISTENER(Listener::GetTaskListener(), onSwapOut, tk->id_);

tk->SwapOut();
}

Expand Down
29 changes: 21 additions & 8 deletions libgo/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "scheduler.h"
#include "../common/error.h"
#include "../common/clock.h"
#include "../debug/listener.h"
#include <stdio.h>
#include <system_error>
#include <unistd.h>
Expand All @@ -11,7 +12,7 @@
namespace co
{

inline atomic_t<unsigned long long> & GetTaskIdFactory()
static inline atomic_t<unsigned long long> & GetTaskIdFactory()
{
static atomic_t<unsigned long long> factory;
return factory;
Expand Down Expand Up @@ -71,22 +72,34 @@ Scheduler::~Scheduler()
Stop();
}

void Scheduler::CreateTask(TaskF const& fn, TaskOpt const& opt)
void Scheduler::CreateTask(TaskF const& _fn, TaskOpt const& _opt)
{
uint64_t id = ++GetTaskIdFactory();

#if ENABLE_LISTENER
TaskF fn = _fn;
TaskOpt opt = _opt;
auto* listener = Listener::GetTaskListener();
if (listener && !listener->onInit(id, fn, opt)) {
return;
}
#else
auto& fn = _fn;
auto& opt = _opt;
#endif

Task* tk = new Task(fn, opt.stack_size_ ? opt.stack_size_ : CoroutineOptions::getInstance().stack_size);

// printf("new tk = %p impl = %p\n", tk, tk->impl_);
tk->SetDeleter(Deleter(&Scheduler::DeleteTask, this));
tk->id_ = ++GetTaskIdFactory();
tk->id_ = id;
TaskRefAffinity(tk) = opt.affinity_;
TaskRefLocation(tk).Init(opt.file_, opt.lineno_);
++taskCount_;

DebugPrint(dbg_task, "task(%s) created in scheduler(%p).", TaskDebugInfo(tk), (void*)this);
#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onCreated(tk->id_);
}
#endif

SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCreated, tk->id_);

AddTask(tk);
}
Expand Down
9 changes: 0 additions & 9 deletions libgo/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@
#include "../common/spinlock.h"
#include "../common/timer.h"
#include "../task/task.h"
#include "../debug/listener.h"
#include "processer.h"
#include <mutex>

namespace co {

struct TaskOpt
{
bool affinity_ = false;
int lineno_ = 0;
std::size_t stack_size_ = 0;
const char* file_ = nullptr;
};

// 协程调度器
// 负责管理1到N个调度线程, 调度从属协程.
// 可以调用Create接口创建更多额外的调度器
Expand Down
26 changes: 5 additions & 21 deletions libgo/task/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,12 @@ const char* GetTaskStateName(TaskState state)
void Task::Run()
{
auto call_fn = [this]() {
#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onStart(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onStart, this->id_);

this->fn_();
this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onCompleted(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onCompleted, this->id_);
};

if (CoroutineOptions::getInstance().exception_handle == eCoExHandle::immedaitely_throw) {
Expand All @@ -52,22 +44,14 @@ void Task::Run()
} catch (...) {
this->fn_ = TaskF();

std::exception_ptr eptr = std::current_exception();
this->eptr_ = std::current_exception();
DebugPrint(dbg_exception, "task(%s) catched exception.", DebugInfo());

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onException(this->id_, eptr);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onException, this->id_, this->eptr_);
}
}

#if ENABLE_DEBUGGER
if (Listener::GetTaskListener()) {
Listener::GetTaskListener()->onFinished(this->id_);
}
#endif
SAFE_CALL_LISTENER(Listener::GetTaskListener(), onFinished, this->id_);

state_ = TaskState::done;
Processer::StaticCoYield();
Expand Down
8 changes: 8 additions & 0 deletions libgo/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ const char* GetTaskStateName(TaskState state);

typedef std::function<void()> TaskF;

struct TaskOpt
{
bool affinity_ = false;
int lineno_ = 0;
std::size_t stack_size_ = 0;
const char* file_ = nullptr;
};

struct TaskGroupKey {};
typedef Anys<TaskGroupKey> TaskAnys;

Expand Down
41 changes: 31 additions & 10 deletions tutorial/sample12_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,40 @@ using namespace std;
//协程监听器的调用过程:
// s: Scheduler,表示该方法运行在调度器上下文中
// c: Coroutine,表示该方法运行在协程上下文中
// (正常运行完成)
// -->[c]onCompleted->
// | |
// [s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// (运行时抛出未捕获的异常)
// (正常运行完成)
// -->[c]onCompleted->
// | |
// [s]onInit-->[s]onCreated-->[s]onSwapIn-->[c]onStart->*--->[c]onSwapOut-- -->[c]onFinished-->[c]onSwapOut
// |\ | |
// | \<-[s]onSwapIn--V |
// | |
// -->[c]onException->
// (运行时抛出未捕获的异常)
//
//!!注意协程监听器回调方法均不能抛出异常,如果可能有异常抛出,请在回调方法内自行 try...catch 消化掉

//覆盖 co::co_listener 的虚函数实现回调方法
class CoListenerSample: public co::Listener::TaskListener {
public:
/**
* 协程准备初始化、即将被创建的时候被调用,可以进行对协程的任务进行封装或者拦截
* (注意此时并未运行在协程中)
*
* @prarm task_id 协程ID
* @prarm fn 协程任务,可以赋值修改此参数对协程任务进行二次封装
* @param opt 协程创建的参数,可以赋值修改此参数值
*
* @return 返回true,正常创建该任务;返回false,放弃此任务
*/
virtual bool onInit(uint64_t task_id, co::TaskF& fn, co::TaskOpt& opt) noexcept {
cout << "onInit task_id=" << task_id << endl;
fn = [fn]() {
cout << "haha, I'm coming. " << endl;
fn();
};
return true;
}

/**
* 协程被创建时被调用
* (注意此时并未运行在协程中)
Expand Down Expand Up @@ -99,6 +118,8 @@ class CoListenerSample: public co::Listener::TaskListener {
} catch (...) {
cout << "unknow exception." << endl;
}

eptr = nullptr;
}

/**
Expand All @@ -125,7 +146,7 @@ class CoListenerSample: public co::Listener::TaskListener {
};

int main(int argc, char** argv) {
#if ENABLE_DEBUGGER
#if ENABLE_LISTENER
CoListenerSample listener;

//设置协程监听器,如果设置为NULL则为取消监听
Expand Down
3 changes: 1 addition & 2 deletions tutorial/sample13_cls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
* 块作用域中推荐使用第一种写法, 免掉了一次隐士转换, 更便于使用
* 第一种写法一定注意不要忘记引用符&
*
* 全局作用域\类成员变量只能使用第二种写法, 并且会有编译warning,
* 请勿开启-Werror选项!
* 全局作用域\类成员变量只能使用第二种写法
*
* co_cls_ref(int)定义了一个可以隐式转换成int&的模板类,
* 如果此处不是int而是自定义类, 要访问类的成员或函数,
Expand Down

0 comments on commit 109958f

Please sign in to comment.