Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Apr 4, 2024
1 parent 1ed3979 commit 0d974a5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 52 deletions.
76 changes: 41 additions & 35 deletions src/ustreamer/workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@

#include "workers.h"

#include <stdatomic.h>

#include <pthread.h>

#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"


static void *_worker_thread(void *v_worker);


us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job) {
Expand All @@ -49,23 +58,21 @@ us_workers_pool_s *us_workers_pool_init(
US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);

for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];

WR(number) = number;
US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
wr->number = index;
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);

US_MUTEX_INIT(WR(has_job_mutex));
atomic_init(&WR(has_job), false);
US_COND_INIT(WR(has_job_cond));
US_MUTEX_INIT(wr->has_job_mutex);
atomic_init(&wr->has_job, false);
US_COND_INIT(wr->has_job_cond);

WR(pool) = pool;
WR(job) = job_init(job_init_arg);
wr->pool = pool;
wr->job = job_init(job_init_arg);

US_THREAD_CREATE(WR(tid), _worker_thread, (void*)&(pool->workers[number]));
US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
pool->free_workers += 1;

# undef WR
}
return pool;
}
Expand All @@ -74,23 +81,21 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_LOG_INFO("Destroying workers pool %s ...", pool->name);

atomic_store(&pool->stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];

US_MUTEX_LOCK(WR(has_job_mutex));
atomic_store(&WR(has_job), true); // Final job: die
US_MUTEX_UNLOCK(WR(has_job_mutex));
US_COND_SIGNAL(WR(has_job_cond));

US_THREAD_JOIN(WR(tid));
US_MUTEX_DESTROY(WR(has_job_mutex));
US_COND_DESTROY(WR(has_job_cond));
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true); // Final job: die
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);

free(WR(name));
US_THREAD_JOIN(wr->tid);
US_MUTEX_DESTROY(wr->has_job_mutex);
US_COND_DESTROY(wr->has_job_cond);

pool->job_destroy(WR(job));
free(wr->name);

# undef WR
pool->job_destroy(wr->job);
}

US_MUTEX_DESTROY(pool->free_workers_mutex);
Expand All @@ -112,14 +117,15 @@ us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
ready_wr->job_timely = true;
pool->oldest_wr = pool->oldest_wr->next_wr;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
if (
!atomic_load(&pool->workers[number].has_job) && (
!atomic_load(&wr->has_job) && (
ready_wr == NULL
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
|| ready_wr->job_start_ts < wr->job_start_ts
)
) {
ready_wr = &pool->workers[number];
ready_wr = wr;
break;
}
}
Expand Down Expand Up @@ -157,15 +163,15 @@ void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, vo
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}

long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;

US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);

pool->approx_job_time = approx_job_time;

const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
const ldf min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров

if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
Expand All @@ -176,7 +182,7 @@ long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_
}

static void *_worker_thread(void *v_worker) {
us_worker_s *wr = v_worker;
us_worker_s *const wr = v_worker;

US_THREAD_SETTLE("%s", wr->name);
US_LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
Expand All @@ -189,7 +195,7 @@ static void *_worker_thread(void *v_worker) {
US_MUTEX_UNLOCK(wr->has_job_mutex);

if (!atomic_load(&wr->pool->stop)) {
const long double job_start_ts = us_get_now_monotonic();
const ldf job_start_ts = us_get_now_monotonic();
wr->job_failed = !wr->pool->run_job(wr);
if (!wr->job_failed) {
wr->job_start_ts = job_start_ts;
Expand Down
29 changes: 12 additions & 17 deletions src/ustreamer/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,26 @@

#pragma once

#include <stdbool.h>
#include <stdatomic.h>

#include <sys/types.h>

#include <pthread.h>

#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/types.h"


typedef struct us_worker_sx {
pthread_t tid;
unsigned number;
char *name;
pthread_t tid;
uint number;
char *name;

long double last_job_time;
ldf last_job_time;

pthread_mutex_t has_job_mutex;
void *job;
atomic_bool has_job;
bool job_timely;
bool job_failed;
long double job_start_ts;
ldf job_start_ts;
pthread_cond_t has_job_cond;

struct us_worker_sx *prev_wr;
Expand All @@ -61,28 +56,28 @@ typedef bool (*us_workers_pool_run_job_f)(us_worker_s *wr);

typedef struct us_workers_pool_sx {
const char *name;
long double desired_interval;
ldf desired_interval;

us_workers_pool_job_destroy_f job_destroy;
us_workers_pool_run_job_f run_job;

unsigned n_workers;
uint n_workers;
us_worker_s *workers;
us_worker_s *oldest_wr;
us_worker_s *latest_wr;

long double approx_job_time;
ldf approx_job_time;

pthread_mutex_t free_workers_mutex;
unsigned free_workers;
uint free_workers;
pthread_cond_t free_workers_cond;

atomic_bool stop;
} us_workers_pool_s;


us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job);
Expand All @@ -92,4 +87,4 @@ void us_workers_pool_destroy(us_workers_pool_s *pool);
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);

long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);

0 comments on commit 0d974a5

Please sign in to comment.