Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
mehah committed Oct 11, 2023
1 parent ca37805 commit 3e162ca
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 97 deletions.
114 changes: 44 additions & 70 deletions src/game/scheduling/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,93 +23,69 @@ void Dispatcher::init() {
threadPool.addLoad([this] {
std::unique_lock lock(mutex);
while (!threadPool.getIoContext().stopped()) {
signal.wait_until(lock, waitTime);
std::this_thread::sleep_for(std::chrono::milliseconds(10));

Task::TIME_NOW = std::chrono::system_clock::now();

busy = true;
{
std::scoped_lock l(tasks.mutexList);
for (const auto &task : tasks.list) {
if (!task.hasExpired()) {
++dispatcherCycle;
task.execute();
}
size_t sizeTaks = eventTasks.size();
for (uint_fast64_t i = 0; i < sizeTaks; ++i) {
auto &task = eventTasks[i];
if (task.execute()) {
++dispatcherCycle;
}
tasks.list.clear();
}
busy = false;

if (Task::TIME_NOW >= waitTime) {
std::scoped_lock l(scheduledtasks.mutex);
for (uint_fast64_t i = 0, max = scheduledtasks.list.size(); i < max && !scheduledtasks.list.empty(); ++i) {
const auto &task = scheduledtasks.list.top();
if (task->getTime() > Task::TIME_NOW) {
waitFor(task);
break;
}

task->execute();
if (sizeTaks == eventTasks.size()) {
eventTasks.clear();
} else {
eventTasks.erase(eventTasks.begin(), eventTasks.begin() + sizeTaks);
}

if (!task->isCanceled() && task->isCycle()) {
scheduledtasks.list.emplace(task);
} else {
scheduledtasks.map.erase(task->getEventId());
}
for (uint_fast64_t i = 0, max = scheduledtasks.size(); i < max && !scheduledtasks.empty(); ++i) {
const auto &task = scheduledtasks.top();
if (task->getTime() > Task::TIME_NOW) {
waitFor(task);
break;
}

task->execute();

scheduledtasks.list.pop();
if (!task->isCanceled() && task->isCycle()) {
scheduledtasks.emplace(task);
} else {
scheduledtasksRef.erase(task->getEventId());
}

scheduledtasks.pop();
}

{
std::scoped_lock l(tasks.mutexList, tasks.mutexWaitingList);
if (!tasks.waitingList.empty()) {
// Transfer Waiting List data to List
tasks.list.insert(tasks.list.end(), make_move_iterator(tasks.waitingList.begin()), make_move_iterator(tasks.waitingList.end()));
tasks.waitingList.clear();
for (auto &thread : threads) {
if (!thread.tasks.empty()) {
eventTasks.insert(eventTasks.end(), make_move_iterator(thread.tasks.begin()), make_move_iterator(thread.tasks.end()));
thread.tasks.clear();
}

if (!thread.scheduledtasks.empty()) {
for (auto &task : thread.scheduledtasks) {
scheduledtasks.emplace(task);
scheduledtasksRef.emplace(task->getEventId(), task);
}

signal.notify_one();
eventTasks.insert(eventTasks.end(), make_move_iterator(thread.tasks.begin()), make_move_iterator(thread.tasks.end()));
thread.scheduledtasks.clear();
}
}
}
});
}

void Dispatcher::addEvent(std::function<void(void)> &&f, std::string &&context, uint32_t expiresAfterMs) {
if (busy) {
std::scoped_lock l(tasks.mutexWaitingList);
tasks.waitingList.emplace_back(expiresAfterMs, f, context);
return;
}

std::scoped_lock l(tasks.mutexList);

const bool notify = tasks.list.empty();

tasks.list.emplace_back(expiresAfterMs, f, context);

if (notify) {
signal.notify_one();
}
threads[getThreadId()].tasks.emplace_back(expiresAfterMs, f, context);
}

uint64_t Dispatcher::scheduleEvent(const std::shared_ptr<Task> &task) {
std::scoped_lock l(scheduledtasks.mutex);

if (task->getEventId() == 0) {
if (++lastEventId == 0) {
lastEventId = 1;
}

task->setEventId(lastEventId);
}

scheduledtasks.list.emplace(task);
scheduledtasks.map.emplace(task->getEventId(), task);

waitFor(scheduledtasks.list.top());

return task->getEventId();
threads[getThreadId()].scheduledtasks.emplace_back(task);
return scheduledtasksRef.emplace(task->generateId(), task).first->first;
}

uint64_t Dispatcher::scheduleEvent(uint32_t delay, std::function<void(void)> &&f, std::string &&context, bool cycle) {
Expand All @@ -118,13 +94,11 @@ uint64_t Dispatcher::scheduleEvent(uint32_t delay, std::function<void(void)> &&f
}

void Dispatcher::stopEvent(uint64_t eventId) {
std::scoped_lock l(scheduledtasks.mutex);

auto it = scheduledtasks.map.find(eventId);
if (it == scheduledtasks.map.end()) {
auto it = scheduledtasksRef.find(eventId);
if (it == scheduledtasksRef.end()) {
return;
}

it->second->cancel();
scheduledtasks.map.erase(it);
scheduledtasksRef.erase(it);
}
36 changes: 22 additions & 14 deletions src/game/scheduling/dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class Dispatcher {
public:
explicit Dispatcher(ThreadPool &threadPool) :
threadPool(threadPool) {
tasks.list.reserve(2000);
tasks.waitingList.reserve(2000);
threads.resize(std::thread::hardware_concurrency());
};

// Ensures that we don't accidentally copy it
Expand Down Expand Up @@ -60,6 +59,12 @@ class Dispatcher {
void stopEvent(uint64_t eventId);

private:
static int16_t getThreadId() {
static std::atomic_int16_t last_id = -1;
thread_local static int16_t id = -1;
return id > -1 ? id : (id = ++last_id);
};

uint64_t scheduleEvent(uint32_t delay, std::function<void(void)> &&f, std::string &&context, bool cycle);
void waitFor(const std::shared_ptr<Task> &task) {
waitTime = task->getTime();
Expand All @@ -75,18 +80,21 @@ class Dispatcher {
uint_fast64_t dispatcherCycle = 0;
uint_fast64_t lastEventId = 0;

struct {
std::mutex mutexList;
std::mutex mutexWaitingList;
std::vector<Task> list;
std::vector<Task> waitingList;
} tasks;

struct {
std::recursive_mutex mutex;
std::priority_queue<std::shared_ptr<Task>, std::deque<std::shared_ptr<Task>>, Task::Compare> list;
phmap::flat_hash_map<uint64_t, std::shared_ptr<Task>> map;
} scheduledtasks;
std::vector<Task> eventTasks;
std::priority_queue<std::shared_ptr<Task>, std::deque<std::shared_ptr<Task>>, Task::Compare> scheduledtasks;
phmap::parallel_flat_hash_map_m<uint64_t, std::shared_ptr<Task>> scheduledtasksRef;

struct ThreadTask {
ThreadTask() {
tasks.reserve(2000);
scheduledtasks.reserve(2000);
}

std::vector<Task> tasks;
std::vector<std::shared_ptr<Task>> scheduledtasks;
};

std::vector<ThreadTask> threads;
};

constexpr auto g_dispatcher = Dispatcher::getInstance;
26 changes: 18 additions & 8 deletions src/game/scheduling/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@
#include "lib/logging/log_with_spd_log.hpp"

std::chrono::system_clock::time_point Task::TIME_NOW = SYSTEM_TIME_ZERO;
std::atomic_uint_fast64_t Task::LAST_EVENT_ID = 0;

void Task::execute() const {
if (func) {
if (hasTraceableContext()) {
g_logger().trace("Executing task {}.", getContext());
} else {
g_logger().debug("Executing task {}.", getContext());
}
func();
bool Task::execute() {
if (!func || hasExpired()) {
return false;
}

if (hasTraceableContext()) {
g_logger().trace("Executing task {}.", getContext());
} else {
g_logger().debug("Executing task {}.", getContext());
}

func();

if (cycle) {
utime = TIME_NOW + std::chrono::milliseconds(delay);
}

return true;
}
16 changes: 11 additions & 5 deletions src/game/scheduling/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ class Task {
func = nullptr;
}

void execute() const;
bool execute();

void execute() {
func();
uint64_t generateId() {
if (eventId == 0) {
if (++LAST_EVENT_ID == 0) {
LAST_EVENT_ID = 1;
}

if (cycle && !canceled) {
utime = TIME_NOW + std::chrono::milliseconds(delay);
eventId = LAST_EVENT_ID;
}

return eventId;
}

struct Compare {
Expand All @@ -83,6 +87,8 @@ class Task {
};

private:
static std::atomic_uint_fast64_t LAST_EVENT_ID;

bool hasTraceableContext() const {
const static auto tasksContext = phmap::flat_hash_set<std::string>({
"Creature::checkCreatureWalk",
Expand Down

0 comments on commit 3e162ca

Please sign in to comment.