Skip to content

Commit

Permalink
#798 run single threaded systems on main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
SanderMertens committed Nov 30, 2022
1 parent c4e74d9 commit 2c7e313
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 94 deletions.
139 changes: 93 additions & 46 deletions flecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -15727,6 +15727,7 @@ typedef struct ecs_pipeline_state_t {
ecs_pipeline_op_t *cur_op; /* Current pipeline op */
int32_t cur_i; /* Index in current result */
int32_t ran_since_merge; /* Index in current op */
bool no_readonly; /* Is pipeline in readonly mode */
} ecs_pipeline_state_t;

typedef struct EcsPipeline {
Expand Down Expand Up @@ -15764,10 +15765,12 @@ void flecs_worker_begin(

bool flecs_worker_sync(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq);

void flecs_worker_end(
ecs_world_t *world);
ecs_world_t *world,
ecs_stage_t *stage);

void flecs_workers_progress(
ecs_world_t *world,
Expand Down Expand Up @@ -15837,8 +15840,8 @@ void flecs_start_workers(
ecs_assert(ecs_get_stage_count(world) == threads, ECS_INTERNAL_ERROR, NULL);

int32_t i;
for (i = 0; i < threads; i ++) {
ecs_stage_t *stage = (ecs_stage_t*)ecs_get_stage(world, i);
for (i = 0; i < threads - 1; i ++) {
ecs_stage_t *stage = (ecs_stage_t*)ecs_get_stage(world, i + 1);
ecs_assert(stage != NULL, ECS_INTERNAL_ERROR, NULL);
ecs_poly_assert(stage, ecs_stage_t);

Expand Down Expand Up @@ -15867,55 +15870,61 @@ void flecs_wait_for_workers(

do {
ecs_os_mutex_lock(world->sync_mutex);
if (world->workers_running == stage_count) {
if (world->workers_running == (stage_count - 1)) {
wait = false;
}
ecs_os_mutex_unlock(world->sync_mutex);
} while (wait);
}

/* Synchronize workers */
/* Wait until all threads are waiting on sync point */
static
void flecs_sync_worker(
void flecs_wait_for_sync(
ecs_world_t *world)
{
int32_t stage_count = ecs_get_stage_count(world);

/* Signal that thread is waiting */
ecs_dbg_3("#[bold]pipeline: waiting for worker sync");

ecs_os_mutex_lock(world->sync_mutex);
if (++ world->workers_waiting == stage_count) {
/* Only signal main thread when all threads are waiting */
ecs_os_cond_signal(world->sync_cond);
if (world->workers_waiting != (stage_count - 1)) {
ecs_os_cond_wait(world->sync_cond, world->sync_mutex);
}

/* Wait until main thread signals that thread can continue */
ecs_os_cond_wait(world->worker_cond, world->sync_mutex);
/* We should have been signalled unless all workers are waiting on sync */
ecs_assert(world->workers_waiting == (stage_count - 1),
ECS_INTERNAL_ERROR, NULL);

ecs_os_mutex_unlock(world->sync_mutex);

ecs_dbg_3("#[bold]pipeline: workers synced");
}

/* Wait until all threads are waiting on sync point */
/* Synchronize workers */
static
void flecs_wait_for_sync(
ecs_world_t *world)
void flecs_sync_worker(
ecs_world_t *world,
ecs_stage_t *stage)
{
int32_t stage_count = ecs_get_stage_count(world);

ecs_dbg_3("#[bold]pipeline: waiting for worker sync");
if (stage->id == 0) {
return;
}

/* Signal that thread is waiting */
ecs_os_mutex_lock(world->sync_mutex);
if (world->workers_waiting != stage_count) {
ecs_os_cond_wait(world->sync_cond, world->sync_mutex);
if (++ world->workers_waiting == (stage_count - 1)) {
/* Only signal main thread when all threads are waiting */
ecs_os_cond_signal(world->sync_cond);
}

/* We should have been signalled unless all workers are waiting on sync */
ecs_assert(world->workers_waiting == stage_count,
ECS_INTERNAL_ERROR, NULL);

/* Wait until main thread signals that thread can continue */
ecs_os_cond_wait(world->worker_cond, world->sync_mutex);
ecs_os_mutex_unlock(world->sync_mutex);

ecs_dbg_3("#[bold]pipeline: workers synced");
}


/* Signal workers that they can start/resume work */
static
void flecs_signal_workers(
Expand All @@ -15938,7 +15947,7 @@ bool ecs_stop_threads(
* a potential race if threads haven't spun up yet. */
ecs_stage_t *stages = world->stages;
int i, count = world->stage_count;
for (i = 0; i < count; i ++) {
for (i = 1; i < count; i ++) {
ecs_stage_t *stage = &stages[i];
if (stage->thread) {
threads_active = true;
Expand All @@ -15960,7 +15969,7 @@ bool ecs_stop_threads(
flecs_signal_workers(world);

/* Join all threads with main */
for (i = 0; i < count; i ++) {
for (i = 1; i < count; i ++) {
ecs_os_thread_join(stages[i].thread);
stages[i].thread = 0;
}
Expand All @@ -15982,14 +15991,15 @@ void flecs_worker_begin(
flecs_stage_from_world(&world);
int32_t stage_count = ecs_get_stage_count(world);
ecs_assert(stage_count != 0, ECS_INTERNAL_ERROR, NULL);

if (stage_count == 1) {
ecs_entity_t pipeline = world->pipeline;
const EcsPipeline *pq = ecs_get(world, pipeline, EcsPipeline);
ecs_assert(pq != NULL, ECS_INTERNAL_ERROR, NULL);

ecs_pipeline_op_t *op = ecs_vector_first(pq->state->ops,
ecs_pipeline_op_t);
pq->state->no_readonly = op->no_readonly;
if (!op || !op->no_readonly) {
ecs_readonly_begin(world);
}
Expand All @@ -15998,6 +16008,7 @@ void flecs_worker_begin(

bool flecs_worker_sync(
ecs_world_t *world,
ecs_stage_t *stage,
ecs_pipeline_state_t *pq)
{
ecs_assert(pq != NULL, ECS_INTERNAL_ERROR, NULL);
Expand All @@ -16008,9 +16019,16 @@ bool flecs_worker_sync(
ecs_assert(stage_count != 0, ECS_INTERNAL_ERROR, NULL);
int64_t build_count = world->info.pipeline_build_count_total;

bool main_thread = stage->id == 0;
bool threaded = stage_count > 1;

/* If there are no threads, merge in place */
if (stage_count == 1) {
if (!pq->cur_op->no_readonly) {
if (main_thread) {
if (threaded) {
flecs_wait_for_sync(world);
}

if (!pq->no_readonly) {
ecs_readonly_end(world);
}

Expand All @@ -16019,24 +16037,34 @@ bool flecs_worker_sync(
/* Synchronize all workers. The last worker to reach the sync point will
* signal the main thread, which will perform the merge. */
} else {
flecs_sync_worker(world);
flecs_sync_worker(world, stage);
}

if (build_count != world->info.pipeline_build_count_total) {
rebuild = true;
}

if (stage_count == 1) {
if (!pq->cur_op->no_readonly) {
if (main_thread) {
ecs_pipeline_op_t *cur_op = pq->cur_op;
pq->no_readonly = cur_op->no_readonly;
if (!cur_op->no_readonly) {
ecs_readonly_begin(world);
}

if (threaded) {
ECS_BIT_COND(world->flags, EcsWorldMultiThreaded,
cur_op->multi_threaded);
world->workers_waiting = 0;
flecs_signal_workers(world);
}
}

return rebuild;
}

void flecs_worker_end(
ecs_world_t *world)
ecs_world_t *world,
ecs_stage_t *stage)
{
flecs_stage_from_world(&world);

Expand All @@ -16052,7 +16080,7 @@ void flecs_worker_end(
/* Synchronize all workers. The last worker to reach the sync point will
* signal the main thread, which will perform the merge. */
} else {
flecs_sync_worker(world);
flecs_sync_worker(world, stage);
}
}

Expand Down Expand Up @@ -16091,28 +16119,32 @@ void flecs_workers_progress(

/* Synchronize n times for each op in the pipeline */
for (; op <= op_last; op ++) {
bool is_threaded = world->flags & EcsWorldMultiThreaded;
if (!op->no_readonly) {
bool no_readonly = op->no_readonly;
pq->no_readonly = no_readonly;

if (!no_readonly) {
ecs_readonly_begin(world);
}
if (!op->multi_threaded) {
world->flags &= ~EcsWorldMultiThreaded;
}

ECS_BIT_COND(world->flags, EcsWorldMultiThreaded,
op->multi_threaded);

/* Signal workers that they should start running systems */
world->workers_waiting = 0;
flecs_signal_workers(world);

ecs_world_t *stage = ecs_get_stage(world, 0);
ecs_entity_t old_scope = ecs_set_scope((ecs_world_t*)stage, 0);
flecs_run_pipeline(stage, pq, delta_time);
ecs_set_scope((ecs_world_t*)stage, old_scope);

/* Wait until all workers are waiting on sync point */
flecs_wait_for_sync(world);

/* Merge */
if (!op->no_readonly) {
if (!pq->no_readonly) {
ecs_readonly_end(world);
}
if (is_threaded) {
world->flags |= EcsWorldMultiThreaded;
}

if (flecs_pipeline_update(world, pq, false)) {
ecs_assert(!ecs_is_deferred(world), ECS_INVALID_OPERATION, NULL);
Expand Down Expand Up @@ -16809,7 +16841,7 @@ void flecs_run_pipeline(
* current position (system). If there are a lot of systems
* in the pipeline this can be an expensive operation, but
* should happen infrequently. */
bool rebuild = flecs_worker_sync(world, pq);
bool rebuild = flecs_worker_sync(world, stage, pq);
if (rebuild) {
i = pq->cur_i;
ran_since_merge = pq->ran_since_merge;
Expand All @@ -16833,7 +16865,7 @@ void flecs_run_pipeline(
world->info.system_time_total += (ecs_ftime_t)ecs_time_measure(&st);
}

flecs_worker_end(stage->thread_ctx);
flecs_worker_end(stage->thread_ctx, stage);
}

bool ecs_progress(
Expand Down Expand Up @@ -18132,6 +18164,12 @@ void* win_thread_join(
return NULL;
}

static
ecs_os_thread_id_t win_thread_self(void)
{
return (ecs_os_thread_id_t)GetCurrentThreadId();
}

static
int32_t win_ainc(
int32_t *count)
Expand Down Expand Up @@ -18339,6 +18377,7 @@ void ecs_set_os_api_impl(void) {

api.thread_new_ = win_thread_new;
api.thread_join_ = win_thread_join;
api.thread_self_ = win_thread_self;
api.ainc_ = win_ainc;
api.adec_ = win_adec;
api.lainc_ = win_lainc;
Expand Down Expand Up @@ -18401,6 +18440,12 @@ void* posix_thread_join(
return arg;
}

static
ecs_os_thread_id_t posix_thread_self(void)
{
return (ecs_os_thread_id_t)pthread_self();
}

static
int32_t posix_ainc(
int32_t *count)
Expand Down Expand Up @@ -18631,6 +18676,7 @@ void ecs_set_os_api_impl(void) {

api.thread_new_ = posix_thread_new;
api.thread_join_ = posix_thread_join;
api.thread_self_ = posix_thread_self;
api.ainc_ = posix_ainc;
api.adec_ = posix_adec;
api.lainc_ = posix_lainc;
Expand Down Expand Up @@ -46771,7 +46817,8 @@ bool ecs_os_has_threading(void) {
(ecs_os_api.cond_signal_ != NULL) &&
(ecs_os_api.cond_broadcast_ != NULL) &&
(ecs_os_api.thread_new_ != NULL) &&
(ecs_os_api.thread_join_ != NULL);
(ecs_os_api.thread_join_ != NULL) &&
(ecs_os_api.thread_self_ != NULL);
}

bool ecs_os_has_time(void) {
Expand Down
8 changes: 8 additions & 0 deletions flecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,9 @@ typedef uintptr_t ecs_os_mutex_t;
typedef uintptr_t ecs_os_dl_t;
typedef uintptr_t ecs_os_sock_t;

/* 64 bit thread id */
typedef uint64_t ecs_os_thread_id_t;

/* Generic function pointer type */
typedef void (*ecs_os_proc_t)(void);

Expand Down Expand Up @@ -2312,6 +2315,9 @@ typedef
void* (*ecs_os_api_thread_join_t)(
ecs_os_thread_t thread);

typedef
ecs_os_thread_id_t (*ecs_os_api_thread_self_t)(void);

/* Atomic increment / decrement */
typedef
int32_t (*ecs_os_api_ainc_t)(
Expand Down Expand Up @@ -2427,6 +2433,7 @@ typedef struct ecs_os_api_t {
/* Threads */
ecs_os_api_thread_new_t thread_new_;
ecs_os_api_thread_join_t thread_join_;
ecs_os_api_thread_self_t thread_self_;

/* Atomic incremenet / decrement */
ecs_os_api_ainc_t ainc_;
Expand Down Expand Up @@ -2610,6 +2617,7 @@ void ecs_os_set_api_defaults(void);
/* Threads */
#define ecs_os_thread_new(callback, param) ecs_os_api.thread_new_(callback, param)
#define ecs_os_thread_join(thread) ecs_os_api.thread_join_(thread)
#define ecs_os_thread_self() ecs_os_api.thread_self_()

/* Atomic increment / decrement */
#define ecs_os_ainc(value) ecs_os_api.ainc_(value)
Expand Down
Loading

0 comments on commit 2c7e313

Please sign in to comment.