@@ -2014,11 +2014,10 @@ struct ggml_threadpool {
20142014 // these are atomic as an annotation for thread-sanitizer
20152015 atomic_bool stop; // Used for stopping the threadpool altogether
20162016 atomic_bool pause; // Used for pausing the threadpool or individual threads
2017- atomic_bool abort; // Used for aborting processing of a graph
20182017
20192018 struct ggml_compute_state * workers; // per thread state
20202019 int n_threads_max; // number of threads in the pool
2021- atomic_int n_threads_cur; // number of threads used in the current graph
2020+ int n_threads_cur; // number of threads used in the current graph
20222021
20232022 int32_t prio; // Scheduling priority
20242023 uint32_t poll; // Polling level (0 - no polling)
@@ -3182,36 +3181,41 @@ inline static void ggml_critical_section_start(void) {
31823181 }
31833182}
31843183
3185- static void ggml_barrier(struct ggml_threadpool * tp) {
3186- int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
3187- if (n_threads == 1) {
3184+ #ifdef GGML_USE_OPENMP
3185+ static void ggml_barrier(struct ggml_threadpool * threadpool) {
3186+ if (threadpool->n_threads_cur == 1) {
31883187 return;
31893188 }
31903189
3191- #ifdef GGML_USE_OPENMP
31923190 #pragma omp barrier
3191+ }
31933192#else
3194- int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
3193+ static void ggml_barrier(struct ggml_threadpool * threadpool) {
3194+ if (threadpool->n_threads_cur == 1) {
3195+ return;
3196+ }
3197+
3198+ atomic_int * n_barrier = &threadpool->n_barrier;
3199+ atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
31953200
3196- // enter barrier (full seq-cst fence)
3197- int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst );
3201+ int n_threads = threadpool->n_threads_cur;
3202+ int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed );
31983203
3199- int last = 0;
3200- if (n_barrier == (n_threads - 1)) {
3204+ if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
32013205 // last thread
3202- atomic_store_explicit(&tp-> n_barrier, 0, memory_order_relaxed );
3203- last = 1 ;
3206+ atomic_store( n_barrier, 0);
3207+ atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed) ;
32043208 } else {
32053209 // wait for other threads
3206- while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
3210+ while (true) {
3211+ if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
3212+ return;
3213+ }
32073214 ggml_thread_cpu_relax();
32083215 }
32093216 }
3210-
3211- // exit barrier (full seq-cst fence)
3212- atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
3213- #endif
32143217}
3218+ #endif
32153219
32163220// TODO: make this somehow automatically executed
32173221// some sort of "sentry" mechanism
@@ -20181,84 +20185,64 @@ struct ggml_cplan ggml_graph_plan(
2018120185
2018220186static thread_ret_t ggml_graph_compute_thread(void * data) {
2018320187 struct ggml_compute_state * state = (struct ggml_compute_state *) data;
20184- struct ggml_threadpool * tp = state->threadpool;
2018520188
20186- const struct ggml_cgraph * cgraph = tp ->cgraph;
20187- const struct ggml_cplan * cplan = tp ->cplan;
20189+ const struct ggml_cgraph * cgraph = state->threadpool ->cgraph;
20190+ const struct ggml_cplan * cplan = state->threadpool ->cplan;
2018820191
2018920192 set_numa_thread_affinity(state->ith);
2019020193
2019120194 struct ggml_compute_params params = {
2019220195 /*.ith =*/ state->ith,
20193- /*.nth =*/ atomic_load_explicit(&tp-> n_threads_cur, memory_order_relaxed) ,
20196+ /*.nth =*/ state->threadpool-> n_threads_cur,
2019420197 /*.wsize =*/ cplan->work_size,
2019520198 /*.wdata =*/ cplan->work_data,
20196- /*.threadpool=*/ tp ,
20199+ /*.threadpool=*/ state->threadpool ,
2019720200 };
2019820201
20199- for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort ; node_n++) {
20202+ for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
2020020203 struct ggml_tensor * node = cgraph->nodes[node_n];
2020120204
2020220205 ggml_compute_forward(¶ms, node);
2020320206
20204- if (state->ith == 0 && cplan->abort_callback &&
20205- cplan->abort_callback(cplan->abort_callback_data)) {
20206- tp->abort = true;
20207- tp->ec = GGML_STATUS_ABORTED;
20207+ if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
20208+ state->threadpool->ec = GGML_STATUS_ABORTED;
2020820209 }
2020920210
2021020211 ggml_barrier(state->threadpool);
20212+
20213+ if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
20214+ break;
20215+ }
2021120216 }
2021220217
2021320218 return 0;
2021420219}
2021520220
2021620221#ifndef GGML_USE_OPENMP
2021720222
20218- // check if thread is active
20219- static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
20220- struct ggml_threadpool * threadpool = state->threadpool;
20221- int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
20222- return (state->ith < n_threads);
20223- }
20224-
20225- // check if thread is ready to proceed (exit from polling or sleeping)
20226- static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
20223+ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
2022720224 struct ggml_threadpool * threadpool = state->threadpool;
2022820225
2022920226 if (state->pending || threadpool->stop || threadpool->pause) { return true; }
2023020227
2023120228 // check for new graph/work
2023220229 int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2023320230 if (new_graph != state->last_graph) {
20234- state->pending = ggml_graph_compute_thread_active (state);
20231+ state->pending = (state->ith < threadpool->n_threads_cur );
2023520232 state->last_graph = new_graph;
2023620233 }
2023720234
2023820235 return state->pending;
2023920236}
2024020237
20241- // sync thread state after polling
20242- static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
20243- struct ggml_threadpool * threadpool = state->threadpool;
20244- // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
20245- // so instead we just use a dummy read-modify-write
20246- atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
20247- }
20248-
2024920238static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
2025020239 struct ggml_threadpool * threadpool = state->threadpool;
2025120240
20252- // Skip polling for unused threads
20253- if (!ggml_graph_compute_thread_active(state)) {
20254- return state->pending;
20255- }
20256-
2025720241 // This seems to make 0 ... 100 a decent range for polling level across modern processors.
2025820242 // Perhaps, we can adjust it dynamically based on load and things.
2025920243 const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
2026020244
20261- for (uint64_t i=0; !ggml_graph_compute_thread_ready (state) && i < n_rounds; i++) {
20245+ for (uint64_t i=0; !ggml_graph_compute_ready (state) && i< n_rounds; i++) {
2026220246 // No new work. Keep polling.
2026320247 ggml_thread_cpu_relax();
2026420248 }
@@ -20270,14 +20254,13 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
2027020254 struct ggml_threadpool * threadpool = state->threadpool;
2027120255
2027220256 if (ggml_graph_compute_poll_for_work(state)) {
20273- ggml_graph_compute_thread_sync(state);
2027420257 return state->pending;
2027520258 }
2027620259
2027720260 ggml_mutex_lock_shared(&threadpool->mutex);
20278- while (!ggml_graph_compute_thread_ready (state)) {
20261+ while (!ggml_graph_compute_ready (state)) {
2027920262 // No new work. Wait for the signal.
20280- GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping) \n", state->ith);
20263+ GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
2028120264 ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
2028220265 }
2028320266 ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20324,20 +20307,13 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
2032420307}
2032520308
2032620309// Start processing new graph
20327- static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads )
20310+ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
2032820311{
20329- // Always take the mutex here because the worker threads are doing hybrid poll/wait
20312+ // always take the mutex here because the worker threads are doing hybrid poll/wait
2033020313
2033120314 ggml_mutex_lock(&threadpool->mutex);
2033220315
20333- GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
20334-
20335- // Update the number of active threads
20336- atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20337-
20338- // Indicate the graph is ready to be processed
20339- // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
20340- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
20316+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
2034120317
2034220318 if (threadpool->pause) {
2034320319 // Update main thread prio and affinity to match the threadpool settings
@@ -20396,7 +20372,6 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
2039620372 threadpool->current_chunk = 0;
2039720373 threadpool->stop = false;
2039820374 threadpool->pause = tpp->paused;
20399- threadpool->abort = false;
2040020375 threadpool->workers = NULL;
2040120376 threadpool->n_threads_max = tpp->n_threads;
2040220377 threadpool->n_threads_cur = tpp->n_threads;
@@ -20472,11 +20447,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
2047220447 // No worker threads should be accessing the parameters below at this stage
2047320448 threadpool->cgraph = cgraph;
2047420449 threadpool->cplan = cplan;
20450+ threadpool->n_threads_cur = n_threads;
2047520451 threadpool->current_chunk = 0;
20476- threadpool->abort = false;
2047720452 threadpool->ec = GGML_STATUS_SUCCESS;
2047820453 }
2047920454
20455+ if (n_threads > threadpool->n_threads_max) {
20456+ GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
20457+ }
20458+
2048020459#ifdef GGML_USE_OPENMP
2048120460 if (n_threads > 1) {
2048220461 #pragma omp parallel num_threads(n_threads)
@@ -20485,7 +20464,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
2048520464 {
2048620465 // update the number of threads from the actual number of threads that we got from OpenMP
2048720466 n_threads = omp_get_num_threads();
20488- atomic_store_explicit(& threadpool->n_threads_cur, n_threads, memory_order_relaxed) ;
20467+ threadpool->n_threads_cur = n_threads ;
2048920468 }
2049020469
2049120470 ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20495,13 +20474,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
2049520474 ggml_graph_compute_thread(&threadpool->workers[0]);
2049620475 }
2049720476#else
20498- if (n_threads > threadpool->n_threads_max) {
20499- GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
20500- n_threads = threadpool->n_threads_max;
20501- }
20502-
2050320477 // Kick all threads to start the new graph
20504- ggml_graph_compute_kickoff(threadpool, n_threads );
20478+ ggml_graph_compute_kickoff(threadpool);
2050520479
2050620480 // This is a work thread too
2050720481 ggml_graph_compute_thread(&threadpool->workers[0]);
0 commit comments