Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
test committed Jan 27, 2016
2 parents 44b1d9d + 09ca44f commit b362d1e
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 288 deletions.
13 changes: 13 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ script:
- make -j4
- sudo make install
- make test_small
- make samples
- make run_test
# - sudo rm * -rf
# - cmake .. -DENABLE_BOOST_COROUTINE=ON
# - make -j4
# - sudo make install
# - make test_small
# - make run_test
# - sudo rm * -rf
# - cmake .. -DENABLE_SHARED_STACK=ON
# - make -j4
# - sudo make install
# - make test_small
# - make run_test
- popd;

after_success:
Expand Down
4 changes: 0 additions & 4 deletions src/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ struct __async_wait<void>
#define go_stack(size) ::co::__go_stack(size)-
#define co_yield do { g_Scheduler.CoYield(); } while (0)

// (uint32_t type, uint64_t id)
#define co_wait(type, id) do { g_Scheduler.UserBlockWait(type, id); } while (0)
#define co_wakeup(type, id) do { g_Scheduler.UserBlockWakeup(type, id); } while (0)

// coroutine sleep, never blocks current thread.
#define co_sleep(milliseconds) do { g_Scheduler.SleepSwitch(milliseconds); } while (0)

Expand Down
2 changes: 1 addition & 1 deletion src/ctx_boost_coroutine/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace co
#if defined(ENABLE_SHARED_STACK)
, boost::coroutines::attributes(shared_stack_cap), shared_stack_allocator(shared_stack, shared_stack_cap)
#else
, boost::coroutines::attributes(stack_size_)
, boost::coroutines::attributes(std::max<std::size_t>(stack_size_, boost::coroutines::stack_traits::minimum_size()))
#endif
);
if (!c) return false;
Expand Down
6 changes: 4 additions & 2 deletions src/ctx_win_fiber/context.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <context.h>
#include <scheduler.h>
#include <Windows.h>
#include <algorithm>

namespace co
{
Expand Down Expand Up @@ -48,8 +49,9 @@ namespace co
bool Context::Init(std::function<void()> const& fn, char* shared_stack, uint32_t shared_stack_cap)
{
impl_->fn_ = fn;
impl_->native_ = CreateFiberEx(g_Scheduler.GetOptions().init_commit_stack_size,
stack_size_, FIBER_FLAG_FLOAT_SWITCH,
SIZE_T commit_size = g_Scheduler.GetOptions().init_commit_stack_size;
impl_->native_ = CreateFiberEx(commit_size,
(std::max)(stack_size_, commit_size), FIBER_FLAG_FLOAT_SWITCH,
(LPFIBER_START_ROUTINE)FiberFunc, &impl_->fn_);
return !!impl_->native_;
}
Expand Down
17 changes: 1 addition & 16 deletions src/processer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,13 @@ uint32_t Processer::Run(ThreadLocalInfo &info, uint32_t &done_count)
break;

case TaskState::sys_block:
case TaskState::user_block:
{
assert(tk->block_);
if (tk->block_) {
it = slist.erase(it);
if (!tk->block_->AddWaitTask(tk))
runnable_list_.push(tk);
tk->block_ = NULL;
} else {
std::unique_lock<LFLock> lock(g_Scheduler.user_wait_lock_);
auto &zone = g_Scheduler.user_wait_tasks_[tk->user_wait_type_];
auto &wait_pair = zone[tk->user_wait_id_];
auto &task_queue = wait_pair.second;
if (wait_pair.first) {
--wait_pair.first;
tk->state_ = TaskState::runnable;
++it;
} else {
it = slist.erase(it);
task_queue.push(tk);
}
g_Scheduler.ClearWaitPairWithoutLock(tk->user_wait_type_,
tk->user_wait_id_, zone, wait_pair);
}
}
break;
Expand Down
89 changes: 0 additions & 89 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,6 @@ void Scheduler::SleepSwitch(int timeout_ms)
sleep_wait_.CoSwitch(timeout_ms);
}

bool Scheduler::UserBlockWait(uint32_t type, uint64_t wait_id)
{
return BlockWait((int64_t)type, wait_id);
}

bool Scheduler::TryUserBlockWait(uint32_t type, uint64_t wait_id)
{
return TryBlockWait((int64_t)type, wait_id);
}

uint32_t Scheduler::UserBlockWakeup(uint32_t type, uint64_t wait_id, uint32_t wakeup_count)
{
return BlockWakeup((int64_t)type, wait_id, wakeup_count);
}

TimerId Scheduler::ExpireAt(CoTimerMgr::TimePoint const& time_point,
CoTimer::fn_t const& fn)
{
Expand Down Expand Up @@ -299,78 +284,4 @@ ThreadPool& Scheduler::GetThreadPool()
return *thread_pool_;
}

bool Scheduler::BlockWait(int64_t type, uint64_t wait_id)
{
if (!IsCoroutine()) return false;
Task* tk = GetLocalInfo().current_task;
tk->user_wait_type_ = type;
tk->user_wait_id_ = wait_id;
tk->state_ = type < 0 ? TaskState::sys_block : TaskState::user_block;
DebugPrint(dbg_wait, "task(%s) %s. wait_type=%lld, wait_id=%llu",
tk->DebugInfo(), type < 0 ? "sys_block" : "user_block",
(long long int)tk->user_wait_type_, (long long unsigned)tk->user_wait_id_);
CoYield();
return true;
}

bool Scheduler::TryBlockWait(int64_t type, uint64_t wait_id)
{
std::unique_lock<LFLock> locker(user_wait_lock_);
auto it = user_wait_tasks_.find(type);
if (user_wait_tasks_.end() == it) return false;

auto &zone = it->second;
auto it2 = zone.find(wait_id);
if (zone.end() == it2) return false;

auto &wait_pair = it2->second;
if (wait_pair.first > 0) {
--wait_pair.first;
ClearWaitPairWithoutLock(type, wait_id, zone, wait_pair);
return true;
}

return false;
}

uint32_t Scheduler::BlockWakeup(int64_t type, uint64_t wait_id, uint32_t wakeup_count)
{
std::unique_lock<LFLock> locker(user_wait_lock_);
auto &zone = user_wait_tasks_[type];
auto &wait_pair = zone[wait_id];
auto &task_queue = wait_pair.second;
SList<Task> tasks = task_queue.pop(wakeup_count);
std::size_t c = tasks.size();
if (c < wakeup_count) // 允许提前设置唤醒标志, 以便多线程同步。
wait_pair.first += wakeup_count - c;
ClearWaitPairWithoutLock(type, wait_id, zone, wait_pair);
uint32_t domain_wakeup = wait_pair.first;
locker.unlock();

for (auto &task: tasks)
{
++c;
Task *tk = &task;
DebugPrint(dbg_wait, "%s wakeup task(%s). wait_type=%lld, wait_id=%llu",
type < 0 ? "sys_block" : "user_block", tk->DebugInfo(), (long long int)type, (long long unsigned)wait_id);
AddTaskRunnable(tk);
}

DebugPrint(dbg_wait, "%s wakeup %u tasks, domain wakeup=%u. wait_type=%lld, wait_id=%llu",
type < 0 ? "sys_block" : "user_block", (unsigned)c, domain_wakeup, (long long int)type, (long long unsigned)wait_id);
return c;
}

void Scheduler::ClearWaitPairWithoutLock(int64_t type,
uint64_t wait_id, WaitZone& zone, WaitPair& wait_pair)
{
if (wait_pair.second.empty() && wait_pair.first == 0) {
if (zone.size() > 1) {
zone.erase(wait_id);
} else {
user_wait_tasks_.erase(type);
}
}
}

} //namespace co
48 changes: 6 additions & 42 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,6 @@ class Scheduler
// \timeout_ms min value is 0.
void SleepSwitch(int timeout_ms);

/// ------------------------------------------------------------------------
// @{ 以计数的方式模拟实现的协程同步方式.
// 初始计数为0, Wait减少计数, Wakeup增加计数.
// UserBlockWait将阻塞式(yield)地等待计数大于0, 等待成功后将计数减一,
// 并将协程切换回可执行状态. 如果不在协程中调用, 则返回false, 且不做任何事.
// TryBlockWait检查当前计数, 如果计数等于0, 则返回false; 否则计数减一并返回true.
// UserBlockWakeup检查当前等待队列, 将等待队列中的前面最多wakeup_count个
// 协程唤醒(设置为可执行状态), 累加剩余计数(wakeup_count减去唤醒的协程数量)
//
// 用户自定义的阻塞切换, type范围限定为: [0, 0xffffffff]
bool UserBlockWait(uint32_t type, uint64_t wait_id);
bool TryUserBlockWait(uint32_t type, uint64_t wait_id);
uint32_t UserBlockWakeup(uint32_t type, uint64_t wait_id, uint32_t wakeup_count = 1);
// }@
/// ------------------------------------------------------------------------

/// ------------------------------------------------------------------------
// @{ 定时器
TimerId ExpireAt(CoTimerMgr::TimePoint const& time_point, CoTimer::fn_t const& fn);
Expand Down Expand Up @@ -216,23 +200,6 @@ class Scheduler
// 将一个协程加入可执行队列中
void AddTaskRunnable(Task* tk);

/// ------------------------------------------------------------------------
// 协程框架定义的阻塞切换, type范围不可与用户自定义范围重叠, 指定为:[-xxxxx, -1]
// 如果不在协程中调用, 则返回false, 且不做任何事.
bool BlockWait(int64_t type, uint64_t wait_id);

// 尝试等待某个事件发生, 功能等同于try_lock, 可在协程外调用.
bool TryBlockWait(int64_t type, uint64_t wait_id);

// 唤醒对某个时间等待的协程.
uint32_t BlockWakeup(int64_t type, uint64_t wait_id, uint32_t wakeup_count = 1);
// @
/// ------------------------------------------------------------------------

// 清理没有等待也没有被等待的WaitPair.
void ClearWaitPairWithoutLock(int64_t type, uint64_t wait_id, WaitZone& zone, WaitPair& wait_pair);

private:
// Run函数的一部分, 处理runnable状态的协程
uint32_t DoRunnable();

Expand Down Expand Up @@ -263,10 +230,6 @@ class Scheduler
// sleep block waiter.
SleepWait sleep_wait_;

// User define wait tasks table.
WaitTable user_wait_tasks_;
LFLock user_wait_lock_;

// Timer manager.
CoTimerMgr timer_mgr_;

Expand All @@ -276,11 +239,12 @@ class Scheduler
std::atomic<uint8_t> sleep_ms_{0};
std::atomic<uint32_t> thread_id_{0};

friend class CoMutex;
friend class BlockObject;
friend class IoWait;
friend class SleepWait;
friend class Processer;
private:
friend class CoMutex;
friend class BlockObject;
friend class IoWait;
friend class SleepWait;
friend class Processer;
};

} //namespace co
Expand Down
5 changes: 1 addition & 4 deletions src/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ enum class TaskState
runnable,
io_block, // write, writev, read, select, poll, ...
sys_block, // co_mutex, ...
user_block, // user switch it.
sleep, // sleep nanosleep poll(NULL, 0, timeout)
sleep, // sleep, nanosleep, poll(NULL, 0, timeout)
done,
fatal,
};
Expand Down Expand Up @@ -69,8 +68,6 @@ struct Task
int io_block_timeout_ = 0;
CoTimerPtr io_block_timer_;

int64_t user_wait_type_ = 0; // user_block等待的类型
uint64_t user_wait_id_ = 0; // user_block等待的id
BlockObject* block_ = NULL; // sys_block等待的block对象

int sleep_ms_ = 0; // 睡眠时间
Expand Down
11 changes: 11 additions & 0 deletions test/gtest_unit/pinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
#ifndef _WIN32
#include <unistd.h>
#include <boost/algorithm/string.hpp>
#else
#include <Windows.h>
#include <Psapi.h>
#pragma comment(lib,"Psapi.lib")
#endif

#include <fstream>
#include <string>
#include <vector>
Expand Down Expand Up @@ -51,6 +56,12 @@ struct pinfo
sscanf(s.c_str(), "VmSwap: %llu KB", (long long unsigned int*)&swap);
}
}
#else
PROCESS_MEMORY_COUNTERS meminfo;
GetProcessMemoryInfo(GetCurrentProcess(), &meminfo, sizeof(meminfo));
rss_high = meminfo.PeakWorkingSetSize / 1024;
rss = meminfo.WorkingSetSize / 1024;
virt_high = virt = 0;
#endif
}

Expand Down
3 changes: 2 additions & 1 deletion test/gtest_unit/run_test
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ runs=`ls test/*.t | wc -l`
pass=`grep "^\[\s*PASSED" result | wc -l`
if [ "${pass}" -eq "${runs}" ];
then
echo "all test were PASSED"
exit 0
else
exit 1
echo "some test were FAIL"
fi
Loading

0 comments on commit b362d1e

Please sign in to comment.