Skip to content

Commit

Permalink
v2.1 Merge branch 'dev': Channel的超时从忙轮询改为定时器
Browse files Browse the repository at this point in the history
  • Loading branch information
test committed Jan 27, 2016
2 parents b362d1e + 2e9e59d commit 0cea895
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 147 deletions.
73 changes: 73 additions & 0 deletions src/block_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,47 @@ void BlockObject::CoBlockWait()
--wakeup_;
return ;
}
lock.unlock();

Task* tk = g_Scheduler.GetLocalInfo().current_task;
tk->block_ = this;
tk->state_ = TaskState::sys_block;
tk->block_timeout_ = std::chrono::nanoseconds::zero();
tk->is_block_timeout_ = false;
++ tk->block_sequence_;
DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo());
g_Scheduler.CoYield();
}

bool BlockObject::CoBlockWaitTimed(std::chrono::nanoseconds timeo)
{
auto begin = std::chrono::high_resolution_clock::now();
if (!g_Scheduler.IsCoroutine()) {
while (!TryBlockWait() &&
std::chrono::duration_cast<std::chrono::nanoseconds>
(std::chrono::high_resolution_clock::now() - begin) < timeo)
usleep(10 * 1000);
return false;
}

std::unique_lock<LFLock> lock(lock_);
if (wakeup_ > 0) {
DebugPrint(dbg_syncblock, "wait immedaitely done.");
--wakeup_;
return true;
}
lock.unlock();

Task* tk = g_Scheduler.GetLocalInfo().current_task;
tk->block_ = this;
tk->state_ = TaskState::sys_block;
++tk->block_sequence_;
tk->block_timeout_ = timeo;
tk->is_block_timeout_ = false;
DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo());
g_Scheduler.CoYield();

return !tk->is_block_timeout_;
}

bool BlockObject::TryBlockWait()
Expand Down Expand Up @@ -71,10 +105,35 @@ bool BlockObject::Wakeup()
return true;
}

tk->block_ = nullptr;
g_Scheduler.AddTaskRunnable(tk);
DebugPrint(dbg_syncblock, "wakeup task(%s).", tk->DebugInfo());
return true;
}
void BlockObject::CancelWait(Task* tk, uint32_t block_sequence)
{
std::unique_lock<LFLock> lock(lock_);
if (tk->block_ != this) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_ is not this!", tk->DebugInfo());
return;
}

if (tk->block_sequence_ != block_sequence) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_sequence_ = %u, block_sequence = %u.",
tk->DebugInfo(), tk->block_sequence_, block_sequence);
return;
}

if (!wait_queue_.erase(tk)) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) erase failed.", tk->DebugInfo());
return;
}

tk->block_ = nullptr;
tk->is_block_timeout_ = true;
g_Scheduler.AddTaskRunnable(tk);
DebugPrint(dbg_syncblock, "cancelwait task(%s).", tk->DebugInfo());
}

bool BlockObject::IsWakeup()
{
Expand All @@ -91,6 +150,20 @@ bool BlockObject::AddWaitTask(Task* tk)
}

wait_queue_.push(tk);

// 带超时的, 增加定时器
if (std::chrono::nanoseconds::zero() != tk->block_timeout_) {
uint32_t seq = tk->block_sequence_;
tk->IncrementRef();
lock.unlock(); // sequence记录完成, task引用计数增加, 可以解锁了

g_Scheduler.ExpireAt(tk->block_timeout_, [=]{
if (tk->block_sequence_ == seq)
this->CancelWait(tk, seq);
tk->DecrementRef();
});
}

return true;
}

Expand Down
15 changes: 12 additions & 3 deletions src/block_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,37 @@
namespace co
{

// 信号管理对象
// @线程安全
class BlockObject
{
protected:
friend class Processer;
std::size_t wakeup_;
std::size_t max_wakeup_;
TSQueue<Task, false> wait_queue_;
std::size_t wakeup_; // 当前信号数量
std::size_t max_wakeup_; // 可以积累的信号数量上限
TSQueue<Task, false> wait_queue_; // 等待信号的协程队列
LFLock lock_;

public:
explicit BlockObject(std::size_t init_wakeup = 0, std::size_t max_wakeup = -1);
~BlockObject();

// 阻塞式等待信号
void CoBlockWait();

// 带超时的阻塞式等待信号
// @returns: 是否成功等到信号
bool CoBlockWaitTimed(std::chrono::nanoseconds timeo);

bool TryBlockWait();

bool Wakeup();

bool IsWakeup();

private:
void CancelWait(Task* tk, uint32_t block_sequence);

bool AddWaitTask(Task* tk);
};

Expand Down
176 changes: 109 additions & 67 deletions src/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,54 +62,21 @@ class Channel
}

template <typename U, typename Duration>
bool BlockTryPush(U && t, Duration const& timeout) const
bool TimedPush(U && t, Duration const& timeout) const
{
int interval = 1;
auto begin = std::chrono::high_resolution_clock::now();
while (!TryPush(std::forward<U>(t))) {
auto now = std::chrono::high_resolution_clock::now();
if (std::chrono::duration_cast<Duration>(now - begin) >= timeout)
return false;

interval = std::min(32, interval << 1);
g_Scheduler.SleepSwitch(interval);
}

return true;
return impl_->TimedPush(std::forward<U>(t), timeout);
}

template <typename U, typename Duration>
bool BlockTryPop(U & t, Duration const& timeout) const
bool TimedPop(U & t, Duration const& timeout) const
{
int interval = 1;
auto begin = std::chrono::high_resolution_clock::now();
while (!TryPop(t)) {
auto now = std::chrono::high_resolution_clock::now();
if (std::chrono::duration_cast<Duration>(now - begin) >= timeout)
return false;

interval = std::min(32, interval << 1);
g_Scheduler.SleepSwitch(interval);
}

return true;
return impl_->TimedPop(t, timeout);
}

template <typename Duration>
bool BlockTryPop(nullptr_t ignore, Duration const& timeout) const
bool TimedPop(nullptr_t ignore, Duration const& timeout) const
{
int interval = 1;
auto begin = std::chrono::high_resolution_clock::now();
while (!TryPop(ignore)) {
auto now = std::chrono::high_resolution_clock::now();
if (std::chrono::duration_cast<Duration>(now - begin) >= timeout)
return false;

interval = std::min(32, interval << 1);
g_Scheduler.SleepSwitch(interval);
}

return true;
return impl_->TimedPop(ignore, timeout);
}

bool Unique() const
Expand Down Expand Up @@ -170,6 +137,72 @@ class Channel
}
}

// write
template <typename U, typename Duration>
bool TimedPush(U && t, Duration const& dur)
{
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
return false;

{
std::unique_lock<CoMutex> lock(queue_lock_);
queue_.push(std::forward<U>(t));
}

read_block_.Wakeup();
return true;
}

// read
template <typename U, typename Duration>
bool TimedPop(U & t, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
{
if (write_block_.TryBlockWait())
return false;
else
{
while (!read_block_.TryBlockWait())
if (write_block_.TryBlockWait())
return false;
else
g_Scheduler.CoYield();
}
}

std::unique_lock<CoMutex> lock(queue_lock_);
t = std::move(queue_.front());
queue_.pop();
return true;
}

// read and ignore
template <typename Duration>
bool TimedPop(nullptr_t ignore, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
{
if (write_block_.TryBlockWait())
return false;
else
{
while (!read_block_.TryBlockWait())
if (write_block_.TryBlockWait())
return false;
else
g_Scheduler.CoYield();
}
}

std::unique_lock<CoMutex> lock(queue_lock_);
queue_.pop();
return true;
}


// try write
template <typename U>
bool TryPush(U && t)
Expand Down Expand Up @@ -266,37 +299,15 @@ class Channel<void>
}

template <typename Duration>
bool BlockTryPush(nullptr_t ignore, Duration const& timeout) const
bool TimedPush(nullptr_t ignore, Duration const& timeout) const
{
int interval = 1;
auto begin = std::chrono::high_resolution_clock::now();
while (!TryPush(ignore)) {
auto now = std::chrono::high_resolution_clock::now();
if (std::chrono::duration_cast<Duration>(now - begin) >= timeout)
return false;

interval = std::min(32, interval << 1);
g_Scheduler.SleepSwitch(interval);
}

return true;
return impl_->TimedPush(ignore, timeout);
}

template <typename Duration>
bool BlockTryPop(nullptr_t ignore, Duration const& timeout) const
bool TimedPop(nullptr_t ignore, Duration const& timeout) const
{
int interval = 1;
auto begin = std::chrono::high_resolution_clock::now();
while (!TryPop(ignore)) {
auto now = std::chrono::high_resolution_clock::now();
if (std::chrono::duration_cast<Duration>(now - begin) >= timeout)
return false;

interval = std::min(32, interval << 1);
g_Scheduler.SleepSwitch(interval);
}

return true;
return impl_->TimedPop(ignore, timeout);
}

bool Unique() const
Expand Down Expand Up @@ -329,6 +340,39 @@ class Channel<void>
read_block_.CoBlockWait();
}

// write
template <typename Duration>
bool TimedPush(nullptr_t ignore, Duration const& dur)
{
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
return false;

read_block_.Wakeup();
return true;
}

// read
template <typename Duration>
bool TimedPop(nullptr_t ignore, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
{
if (write_block_.TryBlockWait())
return false;
else
{
while (!read_block_.TryBlockWait())
if (write_block_.TryBlockWait())
return false;
else
g_Scheduler.CoYield();
}
}

return true;
}

// try write
bool TryPush(nullptr_t ignore)
{
Expand All @@ -352,8 +396,6 @@ class Channel<void>
return true;
}
};


};

} //namespace co
Loading

0 comments on commit 0cea895

Please sign in to comment.