Skip to content

Commit

Permalink
threadpool: futher api cleanup and prep for future refactoring
Browse files Browse the repository at this point in the history
All threadpool related functions and structs use ggml_threadpool prefix.
  • Loading branch information
max-krasnyansky committed Aug 28, 2024
1 parent e3c2202 commit c6328bc
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 56 deletions.
4 changes: 2 additions & 2 deletions examples/llama-bench/llama-bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ int main(int argc, char ** argv) {
tpp.poll = t.poll;
tpp.prio = params.prio;

struct ggml_compute_threadpool* threadpool = ggml_create_threadpool(&tpp);
struct ggml_threadpool* threadpool = ggml_threadpool_create(&tpp);
if (!threadpool) {
LOG_TEE("%s: threadpool create failed : n_threads %d\n", __func__, tpp.n_threads);
exit(1);
Expand Down Expand Up @@ -1578,7 +1578,7 @@ int main(int argc, char ** argv) {

llama_free(ctx);

ggml_release_threadpool(threadpool);
ggml_threadpool_release(threadpool);
}

llama_free_model(lmodel);
Expand Down
10 changes: 5 additions & 5 deletions examples/main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ int main(int argc, char ** argv) {

set_process_priority(params.cpuparams.priority);

struct ggml_compute_threadpool * threadpool_batch = NULL;
struct ggml_threadpool * threadpool_batch = NULL;
if (!ggml_threadpool_params_match(&tpp, &tpp_batch)) {
threadpool_batch = ggml_create_threadpool(&tpp_batch);
threadpool_batch = ggml_threadpool_create(&tpp_batch);
if (!threadpool_batch) {
LOG_TEE("%s: batch threadpool create failed : n_threads %d\n", __func__, tpp_batch.n_threads);
exit(1);
Expand All @@ -244,7 +244,7 @@ int main(int argc, char ** argv) {
tpp.paused = true;
}

struct ggml_compute_threadpool * threadpool = ggml_create_threadpool(&tpp);
struct ggml_threadpool * threadpool = ggml_threadpool_create(&tpp);
if (!threadpool) {
LOG_TEE("%s: threadpool create failed : n_threads %d\n", __func__, tpp.n_threads);
exit(1);
Expand Down Expand Up @@ -1023,8 +1023,8 @@ int main(int argc, char ** argv) {
llama_sampling_free(ctx_sampling);
llama_backend_free();

ggml_release_threadpool(threadpool);
ggml_release_threadpool(threadpool_batch);
ggml_threadpool_release(threadpool);
ggml_threadpool_release(threadpool_batch);

#ifndef LOG_DISABLE_LOGS
LOG_TEE("Log end\n");
Expand Down
2 changes: 1 addition & 1 deletion ggml/include/ggml-backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ extern "C" {

GGML_API GGML_CALL bool ggml_backend_is_cpu (ggml_backend_t backend);
GGML_API void ggml_backend_cpu_set_n_threads (ggml_backend_t backend_cpu, int n_threads);
GGML_API void ggml_backend_cpu_set_threadpool (ggml_backend_t backend_cpu, ggml_compute_threadpool_t threadpool);
GGML_API void ggml_backend_cpu_set_threadpool (ggml_backend_t backend_cpu, ggml_threadpool_t threadpool);
GGML_API void ggml_backend_cpu_set_abort_callback(ggml_backend_t backend_cpu, ggml_abort_callback abort_callback, void * abort_callback_data);

// Create a backend buffer from an existing pointer
Expand Down
18 changes: 9 additions & 9 deletions ggml/include/ggml.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,9 @@ extern "C" {
bool paused; // start in paused state
};

struct ggml_compute_threadpool; // forward declaration, see ggml.c
struct ggml_threadpool; // forward declaration, see ggml.c

typedef struct ggml_compute_threadpool * ggml_compute_threadpool_t;
typedef struct ggml_threadpool * ggml_threadpool_t;

// the compute plan that needs to be prepared for ggml_graph_compute()
// since https://github.com/ggerganov/ggml/issues/287
Expand All @@ -656,7 +656,7 @@ extern "C" {
uint8_t * work_data; // work buffer, to be allocated by caller before calling to `ggml_graph_compute()`

int n_threads;
struct ggml_compute_threadpool * threadpool;
struct ggml_threadpool * threadpool;

// abort ggml_graph_compute when true
ggml_abort_callback abort_callback;
Expand Down Expand Up @@ -2039,18 +2039,18 @@ extern "C" {
GGML_API struct ggml_threadpool_params ggml_threadpool_params_default(int n_threads);
GGML_API void ggml_threadpool_params_init(struct ggml_threadpool_params *p, int n_threads);
GGML_API bool ggml_threadpool_params_match (const struct ggml_threadpool_params *p0, const struct ggml_threadpool_params *p1);
GGML_API struct ggml_compute_threadpool* ggml_create_threadpool (struct ggml_threadpool_params * params);
GGML_API void ggml_release_threadpool (struct ggml_compute_threadpool * threadpool);
GGML_API int ggml_threadpool_get_n_threads(struct ggml_compute_threadpool * threadpool);
GGML_API void ggml_pause_threadpool (struct ggml_compute_threadpool * threadpool);
GGML_API void ggml_resume_threadpool (struct ggml_compute_threadpool * threadpool);
GGML_API struct ggml_threadpool* ggml_threadpool_create (struct ggml_threadpool_params * params);
GGML_API void ggml_threadpool_release (struct ggml_threadpool * threadpool);
GGML_API int ggml_threadpool_get_n_threads(struct ggml_threadpool * threadpool);
GGML_API void ggml_threadpool_pause (struct ggml_threadpool * threadpool);
GGML_API void ggml_threadpool_resume (struct ggml_threadpool * threadpool);

// ggml_graph_plan() has to be called before ggml_graph_compute()
// when plan.work_size > 0, caller must allocate memory for plan.work_data
GGML_API struct ggml_cplan ggml_graph_plan(
const struct ggml_cgraph * cgraph,
int n_threads, /* = GGML_DEFAULT_N_THREADS */
struct ggml_compute_threadpool * threadpool /* = NULL */ );
struct ggml_threadpool * threadpool /* = NULL */ );
GGML_API enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan);

// same as ggml_graph_compute() but the work data is allocated as a part of the context
Expand Down
6 changes: 3 additions & 3 deletions ggml/src/ggml-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ ggml_backend_buffer_type_t ggml_backend_cpu_hbm_buffer_type(void) {

struct ggml_backend_cpu_context {
int n_threads;
ggml_compute_threadpool_t threadpool;
ggml_threadpool_t threadpool;

void * work_data;
size_t work_size;
Expand Down Expand Up @@ -906,14 +906,14 @@ void ggml_backend_cpu_set_n_threads(ggml_backend_t backend_cpu, int n_threads) {
ctx->n_threads = n_threads;
}

void ggml_backend_cpu_set_threadpool(ggml_backend_t backend_cpu, ggml_compute_threadpool_t threadpool) {
void ggml_backend_cpu_set_threadpool(ggml_backend_t backend_cpu, ggml_threadpool_t threadpool) {
GGML_ASSERT(ggml_backend_is_cpu(backend_cpu));

struct ggml_backend_cpu_context * ctx = (struct ggml_backend_cpu_context *)backend_cpu->context;

if (ctx->threadpool && ctx->threadpool != threadpool) {
// already had a different threadpool, pause/suspend it before switching
ggml_pause_threadpool(ctx->threadpool);
ggml_threadpool_pause(ctx->threadpool);
}
ctx->threadpool = threadpool;
}
Expand Down
54 changes: 27 additions & 27 deletions ggml/src/ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ typedef pthread_mutex_t ggml_mutex_t;
#endif

// Threadpool def
struct ggml_compute_threadpool {
struct ggml_threadpool {
ggml_mutex_t mutex; // mutex for cond.var
ggml_cond_t cond; // cond.var for waiting for new work

Expand Down Expand Up @@ -1990,7 +1990,7 @@ struct ggml_compute_state {
int last_graph;
bool pending;
#endif
struct ggml_compute_threadpool * threadpool;
struct ggml_threadpool * threadpool;
int ith;
};

Expand All @@ -2002,7 +2002,7 @@ struct ggml_compute_params {
size_t wsize;
void * wdata;

struct ggml_compute_threadpool * threadpool;
struct ggml_threadpool * threadpool;
};

//
Expand Down Expand Up @@ -3110,15 +3110,15 @@ inline static void ggml_critical_section_start(void) {
}

#ifdef GGML_USE_OPENMP
static void ggml_barrier(struct ggml_compute_threadpool * threadpool) {
static void ggml_barrier(struct ggml_threadpool * threadpool) {
if (threadpool->n_threads_cur == 1) {
return;
}

#pragma omp barrier
}
#else
static void ggml_barrier(struct ggml_compute_threadpool * threadpool) {
static void ggml_barrier(struct ggml_threadpool * threadpool) {
if (threadpool->n_threads_cur == 1) {
return;
}
Expand Down Expand Up @@ -18837,7 +18837,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
}
}

void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
void ggml_threadpool_release(struct ggml_threadpool* threadpool) {
if (!threadpool) return;

#ifndef GGML_USE_OPENMP
Expand Down Expand Up @@ -18868,36 +18868,36 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {

#ifndef GGML_USE_OPENMP
// pause/resume must be called under mutex
static void ggml_pause_threadpool_locked(struct ggml_compute_threadpool * threadpool) {
static void ggml_threadpool_pause_locked(struct ggml_threadpool * threadpool) {
GGML_PRINT_DEBUG("Pausing threadpool\n");
threadpool->pause = true;
ggml_cond_broadcast(&threadpool->cond);
}

static void ggml_resume_threadpool_locked(struct ggml_compute_threadpool * threadpool) {
static void ggml_threadpool_resume_locked(struct ggml_threadpool * threadpool) {
GGML_PRINT_DEBUG("Resuming threadpool\n");
threadpool->pause = false;
ggml_cond_broadcast(&threadpool->cond);
}
#endif

void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
void ggml_threadpool_pause(struct ggml_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
ggml_mutex_lock(&threadpool->mutex);
if (!threadpool->pause) {
ggml_pause_threadpool_locked(threadpool);
ggml_threadpool_pause_locked(threadpool);
}
ggml_mutex_unlock(&threadpool->mutex);
#else
UNUSED(threadpool);
#endif
}

void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
void ggml_threadpool_resume(struct ggml_threadpool * threadpool) {
#ifndef GGML_USE_OPENMP
ggml_mutex_lock(&threadpool->mutex);
if (threadpool->pause) {
ggml_resume_threadpool_locked(threadpool);
ggml_threadpool_resume_locked(threadpool);
}
ggml_mutex_unlock(&threadpool->mutex);
#else
Expand All @@ -18908,7 +18908,7 @@ void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
struct ggml_cplan ggml_graph_plan(
const struct ggml_cgraph * cgraph,
int n_threads,
struct ggml_compute_threadpool * threadpool) {
struct ggml_threadpool * threadpool) {

if (threadpool == NULL) {
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
Expand Down Expand Up @@ -19119,7 +19119,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
#ifndef GGML_USE_OPENMP

static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
struct ggml_threadpool * threadpool = state->threadpool;

if (state->pending || threadpool->stop || threadpool->pause) { return true; }

Expand All @@ -19134,7 +19134,7 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
}

static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
struct ggml_threadpool * threadpool = state->threadpool;

// This seems to make 0 ... 100 a decent range for polling level across modern processors.
// Perhaps, we can adjust it dynamically based on load and things.
Expand All @@ -19149,7 +19149,7 @@ static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state *
}

static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) {
struct ggml_compute_threadpool * threadpool = state->threadpool;
struct ggml_threadpool * threadpool = state->threadpool;

if (ggml_graph_compute_poll_for_work(state)) {
return state->pending;
Expand All @@ -19168,7 +19168,7 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *

static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
struct ggml_compute_threadpool * threadpool = state->threadpool;
struct ggml_threadpool * threadpool = state->threadpool;

ggml_thread_apply_priority(threadpool->prio);
if (ggml_thread_cpumask_is_valid(state->cpumask)) {
Expand Down Expand Up @@ -19205,7 +19205,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
}

// Start processing new graph
static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool)
static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
{
// always take the mutex here because the worker threads are doing hybrid poll/wait

Expand All @@ -19221,7 +19221,7 @@ static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpo
}

// resume does cond broadcast
ggml_resume_threadpool_locked(threadpool);
ggml_threadpool_resume_locked(threadpool);
} else {
ggml_cond_broadcast(&threadpool->cond);
}
Expand Down Expand Up @@ -19254,13 +19254,13 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons
return memcmp(p0->cpumask, p1->cpumask, GGML_MAX_N_THREADS) == 0;
}

static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
static struct ggml_threadpool * ggml_threadpool_create_impl(
struct ggml_threadpool_params * tpp,
struct ggml_cgraph * cgraph,
struct ggml_cplan * cplan) {

struct ggml_compute_threadpool * threadpool =
GGML_ALIGNED_MALLOC(sizeof(struct ggml_compute_threadpool));
struct ggml_threadpool * threadpool =
GGML_ALIGNED_MALLOC(sizeof(struct ggml_threadpool));
{
threadpool->cgraph = cgraph;
threadpool->cplan = cplan;
Expand Down Expand Up @@ -19320,8 +19320,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
return threadpool;
}

struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) {
return ggml_create_threadpool_impl(tpp, NULL, NULL);
struct ggml_threadpool * ggml_threadpool_create(struct ggml_threadpool_params * tpp) {
return ggml_threadpool_create_impl(tpp, NULL, NULL);
}

enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
Expand All @@ -19330,7 +19330,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
GGML_ASSERT(cplan->work_size == 0 || cplan->work_data != NULL);

int n_threads = cplan->n_threads;
struct ggml_compute_threadpool * threadpool = cplan->threadpool;
struct ggml_threadpool * threadpool = cplan->threadpool;

bool disposable_threadpool = false;

Expand All @@ -19339,7 +19339,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
disposable_threadpool = true;

struct ggml_threadpool_params ttp = ggml_threadpool_params_default(n_threads);
threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan);
threadpool = ggml_threadpool_create_impl(&ttp, cgraph, cplan);
} else {
// Reset some of the parameters that need resetting
// No worker threads should be accessing the parameters below at this stage
Expand Down Expand Up @@ -19384,7 +19384,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
enum ggml_status ret = threadpool->ec;

if (disposable_threadpool) {
ggml_release_threadpool(threadpool);
ggml_threadpool_release(threadpool);
}

return ret;
Expand Down
4 changes: 2 additions & 2 deletions include/llama.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ extern "C" {
// Optional: an auto threadpool gets created in ggml if not passed explicitly
LLAMA_API void llama_attach_threadpool(
struct llama_context * ctx,
ggml_compute_threadpool_t threadpool,
ggml_compute_threadpool_t threadpool_batch);
ggml_threadpool_t threadpool,
ggml_threadpool_t threadpool_batch);
LLAMA_API void llama_detach_threadpool(struct llama_context * ctx);

// Call once at the end of the program - currently only used for MPI
Expand Down
Loading

0 comments on commit c6328bc

Please sign in to comment.