diff --git a/src/block_object.cpp b/src/block_object.cpp index bdc6acba..3dd4bb29 100644 --- a/src/block_object.cpp +++ b/src/block_object.cpp @@ -36,13 +36,47 @@ void BlockObject::CoBlockWait() --wakeup_; return ; } + lock.unlock(); Task* tk = g_Scheduler.GetLocalInfo().current_task; tk->block_ = this; tk->state_ = TaskState::sys_block; + tk->block_timeout_ = std::chrono::nanoseconds::zero(); + tk->is_block_timeout_ = false; + ++ tk->block_sequence_; + DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo()); + g_Scheduler.CoYield(); +} + +bool BlockObject::CoBlockWaitTimed(std::chrono::nanoseconds timeo) +{ + auto begin = std::chrono::high_resolution_clock::now(); + if (!g_Scheduler.IsCoroutine()) { + while (!TryBlockWait() && + std::chrono::duration_cast + (std::chrono::high_resolution_clock::now() - begin) < timeo) + usleep(10 * 1000); + return false; + } + + std::unique_lock lock(lock_); + if (wakeup_ > 0) { + DebugPrint(dbg_syncblock, "wait immedaitely done."); + --wakeup_; + return true; + } lock.unlock(); + + Task* tk = g_Scheduler.GetLocalInfo().current_task; + tk->block_ = this; + tk->state_ = TaskState::sys_block; + ++tk->block_sequence_; + tk->block_timeout_ = timeo; + tk->is_block_timeout_ = false; DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo()); g_Scheduler.CoYield(); + + return !tk->is_block_timeout_; } bool BlockObject::TryBlockWait() @@ -71,10 +105,35 @@ bool BlockObject::Wakeup() return true; } + tk->block_ = nullptr; g_Scheduler.AddTaskRunnable(tk); DebugPrint(dbg_syncblock, "wakeup task(%s).", tk->DebugInfo()); return true; } +void BlockObject::CancelWait(Task* tk, uint32_t block_sequence) +{ + std::unique_lock lock(lock_); + if (tk->block_ != this) { + DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_ is not this!", tk->DebugInfo()); + return; + } + + if (tk->block_sequence_ != block_sequence) { + DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_sequence_ = %u, block_sequence = %u.", + tk->DebugInfo(), tk->block_sequence_, block_sequence); + return; + } + + if (!wait_queue_.erase(tk)) { + DebugPrint(dbg_syncblock, "cancelwait task(%s) erase failed.", tk->DebugInfo()); + return; + } + + tk->block_ = nullptr; + tk->is_block_timeout_ = true; + g_Scheduler.AddTaskRunnable(tk); + DebugPrint(dbg_syncblock, "cancelwait task(%s).", tk->DebugInfo()); +} bool BlockObject::IsWakeup() { @@ -91,6 +150,20 @@ bool BlockObject::AddWaitTask(Task* tk) } wait_queue_.push(tk); + + // 带超时的, 增加定时器 + if (std::chrono::nanoseconds::zero() != tk->block_timeout_) { + uint32_t seq = tk->block_sequence_; + tk->IncrementRef(); + lock.unlock(); // sequence记录完成, task引用计数增加, 可以解锁了 + + g_Scheduler.ExpireAt(tk->block_timeout_, [=]{ + if (tk->block_sequence_ == seq) + this->CancelWait(tk, seq); + tk->DecrementRef(); + }); + } + return true; } diff --git a/src/block_object.h b/src/block_object.h index 36071d02..a9578173 100644 --- a/src/block_object.h +++ b/src/block_object.h @@ -5,21 +5,28 @@ namespace co { +// 信号管理对象 +// @线程安全 class BlockObject { protected: friend class Processer; - std::size_t wakeup_; - std::size_t max_wakeup_; - TSQueue wait_queue_; + std::size_t wakeup_; // 当前信号数量 + std::size_t max_wakeup_; // 可以积累的信号数量上限 + TSQueue wait_queue_; // 等待信号的协程队列 LFLock lock_; public: explicit BlockObject(std::size_t init_wakeup = 0, std::size_t max_wakeup = -1); ~BlockObject(); + // 阻塞式等待信号 void CoBlockWait(); + // 带超时的阻塞式等待信号 + // @returns: 是否成功等到信号 + bool CoBlockWaitTimed(std::chrono::nanoseconds timeo); + bool TryBlockWait(); bool Wakeup(); @@ -27,6 +34,8 @@ class BlockObject bool IsWakeup(); private: + void CancelWait(Task* tk, uint32_t block_sequence); + bool AddWaitTask(Task* tk); }; diff --git a/src/channel.h b/src/channel.h index 73e040ab..99b705f1 100644 --- a/src/channel.h +++ b/src/channel.h @@ -62,54 +62,21 @@ class Channel } template - bool BlockTryPush(U && t, Duration const& timeout) const + bool TimedPush(U && t, Duration const& timeout) const { - int interval = 1; - auto begin = std::chrono::high_resolution_clock::now(); - while (!TryPush(std::forward(t))) { - auto now = std::chrono::high_resolution_clock::now(); - if (std::chrono::duration_cast(now - begin) >= timeout) - return false; - - interval = std::min(32, interval << 1); - g_Scheduler.SleepSwitch(interval); - } - - return true; + return impl_->TimedPush(std::forward(t), timeout); } template - bool BlockTryPop(U & t, Duration const& timeout) const + bool TimedPop(U & t, Duration const& timeout) const { - int interval = 1; - auto begin = std::chrono::high_resolution_clock::now(); - while (!TryPop(t)) { - auto now = std::chrono::high_resolution_clock::now(); - if (std::chrono::duration_cast(now - begin) >= timeout) - return false; - - interval = std::min(32, interval << 1); - g_Scheduler.SleepSwitch(interval); - } - - return true; + return impl_->TimedPop(t, timeout); } template - bool BlockTryPop(nullptr_t ignore, Duration const& timeout) const + bool TimedPop(nullptr_t ignore, Duration const& timeout) const { - int interval = 1; - auto begin = std::chrono::high_resolution_clock::now(); - while (!TryPop(ignore)) { - auto now = std::chrono::high_resolution_clock::now(); - if (std::chrono::duration_cast(now - begin) >= timeout) - return false; - - interval = std::min(32, interval << 1); - g_Scheduler.SleepSwitch(interval); - } - - return true; + return impl_->TimedPop(ignore, timeout); } bool Unique() const @@ -170,6 +137,72 @@ class Channel } } + // write + template + bool TimedPush(U && t, Duration const& dur) + { + if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast(dur))) + return false; + + { + std::unique_lock lock(queue_lock_); + queue_.push(std::forward(t)); + } + + read_block_.Wakeup(); + return true; + } + + // read + template + bool TimedPop(U & t, Duration const& dur) + { + write_block_.Wakeup(); + if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast(dur))) + { + if (write_block_.TryBlockWait()) + return false; + else + { + while (!read_block_.TryBlockWait()) + if (write_block_.TryBlockWait()) + return false; + else + g_Scheduler.CoYield(); + } + } + + std::unique_lock lock(queue_lock_); + t = std::move(queue_.front()); + queue_.pop(); + return true; + } + + // read and ignore + template + bool TimedPop(nullptr_t ignore, Duration const& dur) + { + write_block_.Wakeup(); + if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast(dur))) + { + if (write_block_.TryBlockWait()) + return false; + else + { + while (!read_block_.TryBlockWait()) + if (write_block_.TryBlockWait()) + return false; + else + g_Scheduler.CoYield(); + } + } + + std::unique_lock lock(queue_lock_); + queue_.pop(); + return true; + } + + // try write template bool TryPush(U && t) @@ -266,37 +299,15 @@ class Channel } template - bool BlockTryPush(nullptr_t ignore, Duration const& timeout) const + bool TimedPush(nullptr_t ignore, Duration const& timeout) const { - int interval = 1; - auto begin = std::chrono::high_resolution_clock::now(); - while (!TryPush(ignore)) { - auto now = std::chrono::high_resolution_clock::now(); - if (std::chrono::duration_cast(now - begin) >= timeout) - return false; - - interval = std::min(32, interval << 1); - g_Scheduler.SleepSwitch(interval); - } - - return true; + return impl_->TimedPush(ignore, timeout); } template - bool BlockTryPop(nullptr_t ignore, Duration const& timeout) const + bool TimedPop(nullptr_t ignore, Duration const& timeout) const { - int interval = 1; - auto begin = std::chrono::high_resolution_clock::now(); - while (!TryPop(ignore)) { - auto now = std::chrono::high_resolution_clock::now(); - if (std::chrono::duration_cast(now - begin) >= timeout) - return false; - - interval = std::min(32, interval << 1); - g_Scheduler.SleepSwitch(interval); - } - - return true; + return impl_->TimedPop(ignore, timeout); } bool Unique() const @@ -329,6 +340,39 @@ class Channel read_block_.CoBlockWait(); } + // write + template + bool TimedPush(nullptr_t ignore, Duration const& dur) + { + if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast(dur))) + return false; + + read_block_.Wakeup(); + return true; + } + + // read + template + bool TimedPop(nullptr_t ignore, Duration const& dur) + { + write_block_.Wakeup(); + if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast(dur))) + { + if (write_block_.TryBlockWait()) + return false; + else + { + while (!read_block_.TryBlockWait()) + if (write_block_.TryBlockWait()) + return false; + else + g_Scheduler.CoYield(); + } + } + + return true; + } + // try write bool TryPush(nullptr_t ignore) { @@ -352,8 +396,6 @@ class Channel return true; } }; - - }; } //namespace co diff --git a/src/linux/io_wait.cpp b/src/linux/io_wait.cpp index ac75c72c..7e7428cd 100644 --- a/src/linux/io_wait.cpp +++ b/src/linux/io_wait.cpp @@ -29,13 +29,13 @@ void IoWait::CoSwitch(std::vector && fdsts, int timeout_ms) Task* tk = g_Scheduler.GetCurrentTask(); if (!tk) return ; - uint32_t id = ++tk->io_block_id_; + uint32_t id = ++tk->GetIoWaitData().io_block_id_; tk->state_ = TaskState::io_block; - tk->wait_successful_ = 0; - tk->io_block_timeout_ = timeout_ms; - tk->io_block_timer_.reset(); - tk->wait_fds_.swap(fdsts); - for (auto &fdst : tk->wait_fds_) { + tk->GetIoWaitData().wait_successful_ = 0; + tk->GetIoWaitData().io_block_timeout_ = timeout_ms; + tk->GetIoWaitData().io_block_timer_.reset(); + tk->GetIoWaitData().wait_fds_.swap(fdsts); + for (auto &fdst : tk->GetIoWaitData().wait_fds_) { fdst.epoll_ptr.tk = tk; fdst.epoll_ptr.io_block_id = id; } @@ -48,18 +48,18 @@ void IoWait::CoSwitch(std::vector && fdsts, int timeout_ms) void IoWait::SchedulerSwitch(Task* tk) { bool ok = false; - std::unique_lock lock(tk->io_block_lock_, std::defer_lock); - if (tk->wait_fds_.size() > 1) + std::unique_lock lock(tk->GetIoWaitData().io_block_lock_, std::defer_lock); + if (tk->GetIoWaitData().wait_fds_.size() > 1) lock.lock(); // id一定要先取出来, 因为在下面的for中, 有可能在另一个线程epoll_wait成功, // 并且重新进入一次syscall, 导致id变化. - uint32_t id = tk->io_block_id_; + uint32_t id = tk->GetIoWaitData().io_block_id_; RefGuard<> ref_guard(tk); wait_tasks_.push(tk); std::vector> rollback_list; - for (auto &fdst : tk->wait_fds_) + for (auto &fdst : tk->GetIoWaitData().wait_fds_) { epoll_event ev = {fdst.event, {(void*)&fdst.epoll_ptr}}; int epoll_fd_ = ChooseEpoll(fdst.event); @@ -97,7 +97,7 @@ void IoWait::SchedulerSwitch(Task* tk) } DebugPrint(dbg_ioblock, "task(%s) SchedulerSwitch id=%d, nfds=%d, timeout=%d, ok=%s", - tk->DebugInfo(), id, (int)tk->wait_fds_.size(), tk->io_block_timeout_, + tk->DebugInfo(), id, (int)tk->GetIoWaitData().wait_fds_.size(), tk->GetIoWaitData().io_block_timeout_, ok ? "true" : "false"); if (!ok) { @@ -105,37 +105,37 @@ void IoWait::SchedulerSwitch(Task* tk) g_Scheduler.AddTaskRunnable(tk); } } - else if (tk->io_block_timeout_ != -1) { + else if (tk->GetIoWaitData().io_block_timeout_ != -1) { // set timer. tk->IncrementRef(); uint64_t task_id = tk->id_; - auto timer_id = timer_mgr_.ExpireAt(std::chrono::milliseconds(tk->io_block_timeout_), + auto timer_id = timer_mgr_.ExpireAt(std::chrono::milliseconds(tk->GetIoWaitData().io_block_timeout_), [=]{ DebugPrint(dbg_ioblock, "task(%d) syscall timeout", (int)task_id); this->Cancel(tk, id); tk->DecrementRef(); }); - tk->io_block_timer_ = timer_id; + tk->GetIoWaitData().io_block_timer_ = timer_id; } } void IoWait::Cancel(Task *tk, uint32_t id) { - DebugPrint(dbg_ioblock, "task(%s) Cancel id=%d, tk->io_block_id_=%d", - tk->DebugInfo(), id, (int)tk->io_block_id_); + DebugPrint(dbg_ioblock, "task(%s) Cancel id=%d, tk->GetIoWaitData().io_block_id_=%d", + tk->DebugInfo(), id, (int)tk->GetIoWaitData().io_block_id_); - if (tk->io_block_id_ != id) + if (tk->GetIoWaitData().io_block_id_ != id) return ; if (wait_tasks_.erase(tk)) { // sync between timer and epoll_wait. DebugPrint(dbg_ioblock, "task(%s) io_block wakeup. id=%d", tk->DebugInfo(), id); - std::unique_lock lock(tk->io_block_lock_, std::defer_lock); - if (tk->wait_fds_.size() > 1) + std::unique_lock lock(tk->GetIoWaitData().io_block_lock_, std::defer_lock); + if (tk->GetIoWaitData().wait_fds_.size() > 1) lock.lock(); // 清理所有fd - for (auto &fdst: tk->wait_fds_) + for (auto &fdst: tk->GetIoWaitData().wait_fds_) { int epoll_fd_ = ChooseEpoll(fdst.event); if (0 == epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fdst.fd, NULL)) { // sync 1 @@ -184,7 +184,7 @@ int IoWait::WaitLoop() EpollPtr* ep = (EpollPtr*)evs[i].data.ptr; ep->revent = evs[i].events; Task* tk = ep->tk; - ++tk->wait_successful_; + ++tk->GetIoWaitData().wait_successful_; // 将tk暂存, 最后再执行Cancel, 是为了poll和select可以得到正确的计数。 // 以防Task被加入runnable列表后,被其他线程执行 epollwait_tasks_.insert(EpollWaitSt{tk, ep->io_block_id}); @@ -208,9 +208,11 @@ int IoWait::WaitLoop() // 由于epoll_wait的结果中会残留一些未计数的Task*, // epoll的性质决定了这些Task无法计数, // 所以这个析构的操作一定要在epoll_lock的保护中做 - Task::DeleteList delete_list; - Task::SwapDeleteList(delete_list); - for (auto &tk : delete_list) { + SList delete_list; + Task::PopDeleteList(delete_list); + for (auto it = delete_list.begin(); it != delete_list.end();) + { + Task* tk = &*it++; DebugPrint(dbg_task, "task(%s) delete.", tk->DebugInfo()); delete tk; } diff --git a/src/linux/linux_glibc_hook.cpp b/src/linux/linux_glibc_hook.cpp index 4c998963..3a88af4a 100644 --- a/src/linux/linux_glibc_hook.cpp +++ b/src/linux/linux_glibc_hook.cpp @@ -54,15 +54,15 @@ static ssize_t read_write_mode(int fd, OriginF fn, const char* hook_fn_name, uin // add into epoll, and switch other context. g_Scheduler.IOBlockSwitch(fd, event, timeout_ms); bool is_timeout = false; - if (tk->io_block_timer_) { + if (tk->GetIoWaitData().io_block_timer_) { is_timeout = true; - if (g_Scheduler.BlockCancelTimer(tk->io_block_timer_)) { + if (g_Scheduler.BlockCancelTimer(tk->GetIoWaitData().io_block_timer_)) { is_timeout = false; tk->DecrementRef(); // timer use ref. } } - if (tk->wait_successful_ == 0) { + if (tk->GetIoWaitData().wait_successful_ == 0) { if (is_timeout) { fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); errno = EAGAIN; @@ -171,7 +171,7 @@ int connect(int fd, const struct sockaddr *addr, socklen_t addrlen) g_Scheduler.IOBlockSwitch(fd, EPOLLOUT, -1); } - if (tk->wait_successful_ == 0) { + if (tk->GetIoWaitData().wait_successful_ == 0) { // 添加到epoll中失败了 fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); errno = e; @@ -325,15 +325,15 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) // add into epoll, and switch other context. g_Scheduler.IOBlockSwitch(std::move(fdsts), timeout); bool is_timeout = false; // 是否超时 - if (tk->io_block_timer_) { + if (tk->GetIoWaitData().io_block_timer_) { is_timeout = true; - if (g_Scheduler.BlockCancelTimer(tk->io_block_timer_)) { + if (g_Scheduler.BlockCancelTimer(tk->GetIoWaitData().io_block_timer_)) { tk->DecrementRef(); // timer use ref. is_timeout = false; } } - if (tk->wait_successful_ == 0) { + if (tk->GetIoWaitData().wait_successful_ == 0) { if (is_timeout) return 0; else { @@ -345,13 +345,13 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) } int n = 0; - for (int i = 0; i < (int)tk->wait_fds_.size(); ++i) + for (int i = 0; i < (int)tk->GetIoWaitData().wait_fds_.size(); ++i) { - fds[i].revents = EpollEvent2Poll(tk->wait_fds_[i].epoll_ptr.revent); + fds[i].revents = EpollEvent2Poll(tk->GetIoWaitData().wait_fds_[i].epoll_ptr.revent); if (fds[i].revents) ++n; } - assert(n == (int)tk->wait_successful_); + assert(n == (int)tk->GetIoWaitData().wait_successful_); return n; } @@ -414,15 +414,15 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, g_Scheduler.IOBlockSwitch(std::move(fdsts), timeout_ms); bool is_timeout = false; - if (tk->io_block_timer_) { + if (tk->GetIoWaitData().io_block_timer_) { is_timeout = true; - if (g_Scheduler.BlockCancelTimer(tk->io_block_timer_)) { + if (g_Scheduler.BlockCancelTimer(tk->GetIoWaitData().io_block_timer_)) { is_timeout = false; tk->DecrementRef(); // timer use ref. } } - if (tk->wait_successful_ == 0) { + if (tk->GetIoWaitData().wait_successful_ == 0) { if (is_timeout) { if (readfds) FD_ZERO(readfds); if (writefds) FD_ZERO(writefds); @@ -437,7 +437,7 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, } int n = 0; - for (auto &fdst : tk->wait_fds_) { + for (auto &fdst : tk->GetIoWaitData().wait_fds_) { int fd = fdst.fd; for (int si = 0; si < 3; ++si) { if (!sets[si].first) diff --git a/src/processer.cpp b/src/processer.cpp index 8e99de54..c0e132e1 100644 --- a/src/processer.cpp +++ b/src/processer.cpp @@ -91,7 +91,6 @@ uint32_t Processer::Run(ThreadLocalInfo &info, uint32_t &done_count) it = slist.erase(it); if (!tk->block_->AddWaitTask(tk)) runnable_list_.push(tk); - tk->block_ = NULL; } } break; diff --git a/src/task.cpp b/src/task.cpp index 8cc19023..1a36ba6d 100644 --- a/src/task.cpp +++ b/src/task.cpp @@ -12,7 +12,6 @@ uint64_t Task::s_id = 0; std::atomic Task::s_task_count{0}; Task::DeleteList Task::s_delete_list; -LFLock Task::s_delete_list_lock; static void C_func(Task* self) { @@ -68,6 +67,10 @@ Task::Task(TaskF const& fn, std::size_t stack_size) Task::~Task() { --s_task_count; + if (io_wait_data_) { + delete io_wait_data_; + io_wait_data_ = nullptr; + } } void Task::AddIntoProcesser(Processer *proc, char* shared_stack, uint32_t shared_stack_cap) @@ -98,6 +101,15 @@ void Task::SetDebugInfo(std::string const& info) debug_info_ = info + "(" + std::to_string(id_) + ")"; } +IoWaitData& Task::GetIoWaitData() +{ + // 首次进入不会并行, 所以无需加锁 + if (!io_wait_data_) + io_wait_data_ = new IoWaitData; + + return *io_wait_data_; +} + const char* Task::DebugInfo() { if (debug_info_.empty()) @@ -111,15 +123,13 @@ uint64_t Task::GetTaskCount() return s_task_count; } -void Task::SwapDeleteList(DeleteList &output) +void Task::PopDeleteList(SList &output) { - std::unique_lock lock(s_delete_list_lock); - s_delete_list.swap(output); + output = s_delete_list.pop_all(); } std::size_t Task::GetDeletedTaskCount() { - std::unique_lock lock(s_delete_list_lock); return s_delete_list.size(); } @@ -135,11 +145,10 @@ void Task::DecrementRef() DebugPrint(dbg_task, "task(%s) DecrementRef ref=%d", DebugInfo(), (int)ref_count_); if (--ref_count_ == 0) { - std::unique_lock lock(s_delete_list_lock); assert(!this->prev); assert(!this->next); assert(!this->check_); - s_delete_list.push_back(this); + s_delete_list.push(this); } } diff --git a/src/task.h b/src/task.h index 6556096e..49e16f4b 100644 --- a/src/task.h +++ b/src/task.h @@ -48,6 +48,18 @@ struct FdStruct class BlockObject; class Processer; + +// Network IO block所需的数据 +struct IoWaitData +{ + std::atomic io_block_id_{0}; // 每次io_block请求分配一个ID + std::vector wait_fds_; // io_block等待的fd列表 + uint32_t wait_successful_ = 0; // io_block成功等待到的fd数量(用于poll和select) + LFLock io_block_lock_; // 当等待的fd多余1个时, 用此锁sync添加到epoll和从epoll删除的操作, 以防在epoll中残留fd, 导致Task无法释放. + int io_block_timeout_ = 0; + CoTimerPtr io_block_timer_; +}; + struct Task : public TSQueueHook { @@ -61,14 +73,12 @@ struct Task std::exception_ptr eptr_; // 保存exception的指针 std::atomic ref_count_{1};// 引用计数 - std::atomic io_block_id_; // 每次io_block请求分配一个ID - std::vector wait_fds_; // io_block等待的fd列表 - uint32_t wait_successful_ = 0; // io_block成功等待到的fd数量(用于poll和select) - LFLock io_block_lock_; // 当等待的fd多余1个时, 用此锁sync添加到epoll和从epoll删除的操作, 以防在epoll中残留fd, 导致Task无法释放. - int io_block_timeout_ = 0; - CoTimerPtr io_block_timer_; + IoWaitData *io_wait_data_ = nullptr;// Network IO block所需的数据 - BlockObject* block_ = NULL; // sys_block等待的block对象 + BlockObject* block_ = nullptr; // sys_block等待的block对象 + uint32_t block_sequence_ = 0; // sys_block等待序号(用于做超时校验) + std::chrono::nanoseconds block_timeout_{0}; // sys_block超时时间 + bool is_block_timeout_ = false; // sys_block的等待是否超时 int sleep_ms_ = 0; // 睡眠时间 @@ -83,6 +93,8 @@ struct Task void SetDebugInfo(std::string const& info); const char* DebugInfo(); + IoWaitData& GetIoWaitData(); + static uint64_t s_id; static std::atomic s_task_count; @@ -91,11 +103,10 @@ struct Task static uint64_t GetTaskCount(); // Task引用计数归0时不要立即释放, 以防epoll_wait取到残余数据时访问野指针. - typedef std::list DeleteList; + typedef TSQueue DeleteList; static DeleteList s_delete_list; - static LFLock s_delete_list_lock; - static void SwapDeleteList(DeleteList &output); + static void PopDeleteList(SList &output); static std::size_t GetDeletedTaskCount(); }; diff --git a/src/windows/io_wait.cpp b/src/windows/io_wait.cpp index 4f4cc666..ba1b51ac 100644 --- a/src/windows/io_wait.cpp +++ b/src/windows/io_wait.cpp @@ -20,10 +20,13 @@ void IoWait::SchedulerSwitch(Task* tk) int IoWait::WaitLoop() { - Task::DeleteList delete_list; - Task::SwapDeleteList(delete_list); - for (auto tk : delete_list) - delete tk; + SList delete_list; + Task::PopDeleteList(delete_list); + for (auto it = delete_list.begin(); it != delete_list.end();) + { + Task* tk = &*it++; + delete tk; + } return 0; } diff --git a/test/gtest_unit/bm.cpp b/test/gtest_unit/bm.cpp index ec0d07d1..1bc2310b 100644 --- a/test/gtest_unit/bm.cpp +++ b/test/gtest_unit/bm.cpp @@ -61,6 +61,7 @@ TEST_P(Times, testBm) { stdtimer st(tc_, "Switch and Delete coroutine"); g_Scheduler.RunUntilNoTask(); + g_Scheduler.RunUntilNoTask(); } // { @@ -74,19 +75,17 @@ TEST_P(Times, testBm) { for (int i = 0; i < tc_; ++i) { - go []{}; - } - - stdtimer st1; - g_Scheduler.RunUntilNoTask(); - auto s = st1.expired(); - - for (int i = 0; i < tc_; ++i) { - go []{ co_yield; }; + go []{ co_yield; co_yield; }; } + g_Scheduler.Run(); stdtimer st(tc_, "Switch coroutine"); - st.except(s); + g_Scheduler.Run(); + } + + { + stdtimer st(tc_, "Switch and Delete coroutine"); + g_Scheduler.RunUntilNoTask(); g_Scheduler.RunUntilNoTask(); } diff --git a/test/gtest_unit/channel.cpp b/test/gtest_unit/channel.cpp index 235773cd..6b835ed1 100644 --- a/test/gtest_unit/channel.cpp +++ b/test/gtest_unit/channel.cpp @@ -336,7 +336,7 @@ TEST(Channel, capacity1Try) } } -TEST(Channel, capacity0BlockTry) +TEST(Channel, capacity0Timed) { { co_chan ch; @@ -344,7 +344,7 @@ TEST(Channel, capacity0BlockTry) // block try go [=] { auto s = system_clock::now(); - bool ok = ch.BlockTryPush(1, milliseconds(32)); + bool ok = ch.TimedPush(1, milliseconds(32)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -356,7 +356,7 @@ TEST(Channel, capacity0BlockTry) // block try go [=] { auto s = system_clock::now(); - bool ok = ch.BlockTryPush(1, milliseconds(100)); + bool ok = ch.TimedPush(1, milliseconds(100)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -368,7 +368,7 @@ TEST(Channel, capacity0BlockTry) go [=] { auto s = system_clock::now(); int i; - bool ok = ch.BlockTryPop(i, milliseconds(100)); + bool ok = ch.TimedPop(i, milliseconds(100)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -379,7 +379,7 @@ TEST(Channel, capacity0BlockTry) go [=] { auto s = system_clock::now(); - bool ok = ch.BlockTryPop(nullptr, milliseconds(100)); + bool ok = ch.TimedPop(nullptr, milliseconds(100)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -394,7 +394,7 @@ TEST(Channel, capacity0BlockTry) go [=] { auto s = system_clock::now(); - bool ok = ch.BlockTryPush(nullptr, milliseconds(100)); + bool ok = ch.TimedPush(nullptr, milliseconds(100)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -405,7 +405,7 @@ TEST(Channel, capacity0BlockTry) go [=] { auto s = system_clock::now(); - bool ok = ch.BlockTryPop(nullptr, milliseconds(100)); + bool ok = ch.TimedPop(nullptr, milliseconds(100)); auto e = system_clock::now(); auto c = duration_cast(e - s).count(); EXPECT_FALSE(ok); @@ -414,4 +414,59 @@ TEST(Channel, capacity0BlockTry) }; g_Scheduler.RunUntilNoTask(); } + + { + co_chan ch; + + for (int i = 0; i < 1000; ++i) + go [=] { + auto s = system_clock::now(); + bool ok = ch.TimedPush(nullptr, milliseconds(500)); + auto e = system_clock::now(); + auto c = duration_cast(e - s).count(); + EXPECT_FALSE(ok); + EXPECT_GT(c, 499); + EXPECT_LT(c, 533); + }; + g_Scheduler.RunUntilNoTask(); + } + + { + co_chan ch; + + for (int i = 0; i < 1000; ++i) + go [=] { + auto s = system_clock::now(); + bool ok = ch.TimedPop(nullptr, milliseconds(500)); + auto e = system_clock::now(); + auto c = duration_cast(e - s).count(); + EXPECT_FALSE(ok); + EXPECT_GT(c, 499); + EXPECT_LT(c, 533); + }; + g_Scheduler.RunUntilNoTask(); + } + + { + co_chan ch; + + for (int i = 0; i < 1000; ++i) + go [=] { + int v; + auto s = system_clock::now(); + bool ok = ch.TimedPop(v, milliseconds(500)); + auto e = system_clock::now(); + auto c = duration_cast(e - s).count(); + EXPECT_TRUE(ok); + EXPECT_EQ(i, v); + EXPECT_LT(c, 50); + }; + + for (int i = 0; i < 1000; ++i) + go [=] { + ch << i; + }; + + g_Scheduler.RunUntilNoTask(); + } }