Skip to content

Commit

Permalink
Merge pull request #32114 from JuliaLang/jb/toomanytasks
Browse files Browse the repository at this point in the history
avoid error when Task multiq is full
  • Loading branch information
JeffBezanson authored May 29, 2019
2 parents 8f9d356 + 169550a commit 4b0c8e7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
7 changes: 6 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,12 @@ function enq_work(t::Task)
push!(Workqueues[tid], t)
else
tid = 0
ccall(:jl_enqueue_task, Cvoid, (Any,), t)
if ccall(:jl_enqueue_task, Cint, (Any,), t) != 0
# if multiq is full, give to a random thread (TODO fix)
tid = mod(time_ns() % Int, Threads.nthreads()) + 1
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
push!(Workqueues[tid], t)
end
end
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
return t
Expand Down
27 changes: 17 additions & 10 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ typedef struct taskheap_tag {

/* multiqueue parameters */
static const int16_t heap_d = 8;
static const int heap_c = 4;
static const int heap_c = 16;

/* size of each heap */
static const int tasks_per_heap = 8192; // TODO: this should be smaller by default, but growable!
static const int tasks_per_heap = 16384; // TODO: this should be smaller by default, but growable!

/* the multiqueue's heaps */
static taskheap_t *heaps;
Expand Down Expand Up @@ -117,7 +117,7 @@ static inline int multiq_insert(jl_task_t *task, int16_t priority)

if (heaps[rn].ntasks >= tasks_per_heap) {
jl_mutex_unlock_nogc(&heaps[rn].lock);
jl_error("multiq insertion failed, increase #tasks per heap");
// multiq insertion failed, increase #tasks per heap
return -1;
}

Expand Down Expand Up @@ -287,9 +287,11 @@ void jl_threadfun(void *arg)


// enqueue the specified task for execution
JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task)
JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task)
{
multiq_insert(task, task->prio);
if (multiq_insert(task, task->prio) == -1)
return 1;
return 0;
}


Expand Down Expand Up @@ -372,17 +374,22 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
}


JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid)
{
// Try to acquire the lock on this task.
// If this fails, we'll check for that error later (in jl_switchto).
if (jl_atomic_load_acquire(&task->tid) != tid) {
jl_atomic_compare_exchange(&task->tid, -1, tid);
}
}

// get the next runnable task from the multiq
static jl_task_t *get_next_task(jl_value_t *getsticky)
{
jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1);
if (jl_typeis(task, jl_task_type)) {
int self = jl_get_ptls_states()->tid;
// try to acquire the lock on this task now
// we'll check this error later (in yieldto)
if (jl_atomic_load_acquire(&task->tid) != self) {
jl_atomic_compare_exchange(&task->tid, -1, self);
}
jl_set_task_tid(task, self);
return task;
}
#ifdef JULIA_ENABLE_THREADING
Expand Down
8 changes: 4 additions & 4 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ void JL_NORETURN jl_finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNR
{
jl_ptls_t ptls = jl_get_ptls_states();
JL_SIGATOMIC_BEGIN();
t->result = resultval;
jl_gc_wb(t, t->result);
if (t->exception != jl_nothing)
t->state = failed_sym;
jl_atomic_store_release(&t->state, failed_sym);
else
t->state = done_sym;
jl_atomic_store_release(&t->state, done_sym);
if (t->copy_stack) // early free of stkbuf
t->stkbuf = NULL;
t->result = resultval;
jl_gc_wb(t, t->result);
// ensure that state is cleared
ptls->in_finalizer = 0;
ptls->in_pure_callback = 0;
Expand Down

0 comments on commit 4b0c8e7

Please sign in to comment.