Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule the cycle detector with higher priority using the inject queue #3507

Merged
merged 1 commit into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions src/libponyrt/actor/actor.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ static bool well_formed_msg_chain(pony_msg_t* first, pony_msg_t* last)
}
#endif

static void send_unblock(pony_ctx_t* ctx, pony_actor_t* actor)
static void send_unblock(pony_actor_t* actor)
{
// Send unblock before continuing.
unset_flag(actor, FLAG_BLOCKED | FLAG_BLOCKED_SENT);
ponyint_cycle_unblock(ctx, actor);
ponyint_cycle_unblock(actor);
}

static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
Expand Down Expand Up @@ -120,7 +120,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
has_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(ctx, actor);
send_unblock(actor);
}

return false;
Expand All @@ -147,7 +147,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
has_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(ctx, actor);
send_unblock(actor);
}

return false;
Expand Down Expand Up @@ -178,7 +178,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
{
// We've sent a block message, send confirm.
pony_msgi_t* m = (pony_msgi_t*)msg;
ponyint_cycle_ack(ctx, m->i);
ponyint_cycle_ack(m->i);
}

return false;
Expand All @@ -196,7 +196,8 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
{
// We're blocked, send block message.
set_flag(actor, FLAG_BLOCKED_SENT);
ponyint_cycle_block(ctx, actor, &actor->gc);
pony_assert(ctx->current == actor);
ponyint_cycle_block(actor, &actor->gc);
}

return false;
Expand Down Expand Up @@ -275,7 +276,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
if(has_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(ctx, actor);
send_unblock(actor);
}

DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
Expand Down Expand Up @@ -595,19 +596,20 @@ PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type)

// tell the cycle detector we exist if block messages are enabled
if(!actor_noblock)
ponyint_cycle_actor_created(ctx, actor);
ponyint_cycle_actor_created(actor);

DTRACE2(ACTOR_ALLOC, (uintptr_t)ctx->scheduler, (uintptr_t)actor);
return actor;
}

PONY_API void ponyint_destroy(pony_ctx_t* ctx, pony_actor_t* actor)
{
(void)ctx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ponylang/core how do you folks feel about changing the signature of ponyint_destroy to remove the ctx argument? (asking because it's marked as PONY_API meaning it's a breaking change)

// This destroys an actor immediately.
// The finaliser is not called.

// Notify cycle detector of actor being destroyed
ponyint_cycle_actor_destroyed(ctx, actor);
ponyint_cycle_actor_destroyed(actor);

ponyint_actor_setpendingdestroy(actor);
ponyint_actor_destroy(actor);
Expand Down Expand Up @@ -869,11 +871,13 @@ PONY_API void pony_schedule(pony_ctx_t* ctx, pony_actor_t* actor)

PONY_API void pony_unschedule(pony_ctx_t* ctx, pony_actor_t* actor)
{
(void)ctx;

if(has_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
if(!actor_noblock)
send_unblock(ctx, actor);
send_unblock(actor);
}

set_flag(actor, FLAG_UNSCHEDULED);
Expand Down
30 changes: 19 additions & 11 deletions src/libponyrt/gc/cycle.c
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ void ponyint_cycle_create(pony_ctx_t* ctx, uint32_t detect_interval)
d->last_checked = HASHMAP_BEGIN;
}

bool ponyint_cycle_check_blocked(pony_ctx_t* ctx, uint64_t tsc, uint64_t tsc2)
bool ponyint_cycle_check_blocked(uint64_t tsc, uint64_t tsc2)
{
// if tsc > tsc2 then don't trigger cycle detector
// this is used to ensure scheduler queue is empty during
Expand All @@ -1077,34 +1077,39 @@ bool ponyint_cycle_check_blocked(pony_ctx_t* ctx, uint64_t tsc, uint64_t tsc2)
// if enough time has passed, trigger cycle detector
if((tsc2 - tsc) > d->detect_interval)
{
pony_ctx_t* ctx = ponyint_sched_get_inject_context();
pony_send(ctx, cycle_detector, ACTORMSG_CHECKBLOCKED);
return true;
}

return false;
}

void ponyint_cycle_actor_created(pony_ctx_t* ctx, pony_actor_t* actor)
void ponyint_cycle_actor_created(pony_actor_t* actor)
{
// this will only be false during the creation of the cycle detector
// and after the runtime has been shut down
if(cycle_detector)
if(cycle_detector) {
pony_ctx_t* ctx = ponyint_sched_get_inject_context();
pony_sendp(ctx, cycle_detector, ACTORMSG_CREATED, actor);
}
}

void ponyint_cycle_actor_destroyed(pony_ctx_t* ctx, pony_actor_t* actor)
void ponyint_cycle_actor_destroyed(pony_actor_t* actor)
{
// this will only be false during the creation of the cycle detector
// and after the runtime has been shut down or if the cycle detector
// is being destroyed
if(cycle_detector && !ponyint_is_cycle(actor))
if(cycle_detector && !ponyint_is_cycle(actor)) {
pony_ctx_t* ctx = ponyint_sched_get_inject_context();
pony_sendp(ctx, cycle_detector, ACTORMSG_DESTROYED, actor);
}
}

void ponyint_cycle_block(pony_ctx_t* ctx, pony_actor_t* actor, gc_t* gc)
void ponyint_cycle_block(pony_actor_t* actor, gc_t* gc)
{
pony_assert(ctx->current == actor);
pony_assert(&actor->gc == gc);
pony_ctx_t* ctx = ponyint_sched_get_inject_context();

block_msg_t* m = (block_msg_t*)pony_alloc_msg(
POOL_INDEX(sizeof(block_msg_t)), ACTORMSG_BLOCK);
Expand All @@ -1122,18 +1127,22 @@ void ponyint_cycle_block(pony_ctx_t* ctx, pony_actor_t* actor, gc_t* gc)
pony_sendv(ctx, cycle_detector, &m->msg, &m->msg, false);
}

void ponyint_cycle_unblock(pony_ctx_t* ctx, pony_actor_t* actor)
void ponyint_cycle_unblock(pony_actor_t* actor)
{
pony_ctx_t* ctx = ponyint_sched_get_inject_context();
pony_sendp(ctx, cycle_detector, ACTORMSG_UNBLOCK, actor);
}

void ponyint_cycle_ack(pony_ctx_t* ctx, size_t token)
void ponyint_cycle_ack(size_t token)
{
pony_ctx_t* ctx = ponyint_sched_get_inject_context();
pony_sendi(ctx, cycle_detector, ACTORMSG_ACK, token);
}

void ponyint_cycle_terminate(pony_ctx_t* ctx)
void ponyint_cycle_terminate()
{
pony_ctx_t* ctx = ponyint_sched_get_inject_context();

pony_become(ctx, cycle_detector);
final(ctx, cycle_detector);
ponyint_destroy(ctx, cycle_detector);
Expand All @@ -1145,7 +1154,6 @@ bool ponyint_is_cycle(pony_actor_t* actor)
return actor == cycle_detector;
}


#ifdef USE_MEMTRACK
size_t ponyint_cycle_mem_size()
{
Expand Down
14 changes: 7 additions & 7 deletions src/libponyrt/gc/cycle.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ PONY_EXTERN_C_BEGIN

void ponyint_cycle_create(pony_ctx_t* ctx, uint32_t detect_interval);

bool ponyint_cycle_check_blocked(pony_ctx_t* ctx, uint64_t tsc, uint64_t tsc2);
bool ponyint_cycle_check_blocked(uint64_t tsc, uint64_t tsc2);

void ponyint_cycle_actor_created(pony_ctx_t* ctx, pony_actor_t* actor);
void ponyint_cycle_actor_created(pony_actor_t* actor);

void ponyint_cycle_actor_destroyed(pony_ctx_t* ctx, pony_actor_t* actor);
void ponyint_cycle_actor_destroyed(pony_actor_t* actor);

void ponyint_cycle_block(pony_ctx_t* ctx, pony_actor_t* actor, gc_t* gc);
void ponyint_cycle_block(pony_actor_t* actor, gc_t* gc);

void ponyint_cycle_unblock(pony_ctx_t* ctx, pony_actor_t* actor);
void ponyint_cycle_unblock(pony_actor_t* actor);

void ponyint_cycle_ack(pony_ctx_t* ctx, size_t token);
void ponyint_cycle_ack(size_t token);

void ponyint_cycle_terminate(pony_ctx_t* ctx);
void ponyint_cycle_terminate();

bool ponyint_is_cycle(pony_actor_t* actor);

Expand Down
17 changes: 13 additions & 4 deletions src/libponyrt/sched/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static uint64_t scheduler_suspend_threshold;
static PONY_ATOMIC(uint32_t) active_scheduler_count;
static PONY_ATOMIC(uint32_t) active_scheduler_count_check;
static scheduler_t* scheduler;
static pony_ctx_t* inject_context;
static PONY_ATOMIC(bool) detect_quiescence;
static bool use_yield;
static mpmcq_t inject;
Expand Down Expand Up @@ -847,7 +848,7 @@ static pony_actor_t* steal(scheduler_t* sched)
{
// trigger cycle detector by sending it a message if it is time
uint64_t current_tsc = ponyint_cpu_tick();
if(ponyint_cycle_check_blocked(&sched->ctx, last_cd_tsc, current_tsc))
if(ponyint_cycle_check_blocked(last_cd_tsc, current_tsc))
{
last_cd_tsc = current_tsc;

Expand Down Expand Up @@ -880,6 +881,7 @@ static void run(scheduler_t* sched)
last_cd_tsc = 0;

pony_actor_t* actor = pop_global(sched);

if (DTRACE_ENABLED(ACTOR_SCHEDULED) && actor != NULL) {
DTRACE2(ACTOR_SCHEDULED, (uintptr_t)sched, (uintptr_t)actor);
}
Expand All @@ -894,7 +896,7 @@ static void run(scheduler_t* sched)
{
// trigger cycle detector by sending it a message if it is time
uint64_t current_tsc = ponyint_cpu_tick();
if(ponyint_cycle_check_blocked(&sched->ctx, last_cd_tsc, current_tsc))
if(ponyint_cycle_check_blocked(last_cd_tsc, current_tsc))
{
last_cd_tsc = current_tsc;

Expand Down Expand Up @@ -1018,7 +1020,7 @@ static void ponyint_sched_shutdown()
ponyint_thread_join(scheduler[i].tid);

DTRACE0(RT_END);
ponyint_cycle_terminate(&scheduler[0].ctx);
ponyint_cycle_terminate();

for(uint32_t i = 0; i < scheduler_count; i++)
{
Expand Down Expand Up @@ -1053,6 +1055,7 @@ static void ponyint_sched_shutdown()
* sizeof(scheduler_t)));
#endif
scheduler = NULL;
inject_context = NULL;
scheduler_count = 0;
atomic_store_explicit(&active_scheduler_count, 0, memory_order_relaxed);

Expand Down Expand Up @@ -1143,7 +1146,9 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool pin,
ponyint_mpmcq_init(&inject);
ponyint_asio_init(asio_cpu);

return pony_ctx();
inject_context = pony_ctx();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add code to ponyint_sched_shutdown to set inject_context = NULL to clean things up.


return inject_context;
}

bool ponyint_sched_start(bool library)
Expand Down Expand Up @@ -1279,6 +1284,10 @@ void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id)
}
}

pony_ctx_t* ponyint_sched_get_inject_context() {
return inject_context;
}

// Maybe wake up a scheduler thread if possible
void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id)
{
Expand Down
4 changes: 4 additions & 0 deletions src/libponyrt/sched/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id);
// threads are asleep
void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id);

// Retrieves the global main thread context for scheduling
// special actors on the inject queue.
pony_ctx_t* ponyint_sched_get_inject_context();

#ifdef USE_MEMTRACK
/** Get the static memory used by the scheduler subsystem.
*/
Expand Down