Skip to content

Commit

Permalink
Add reference counting to threadequeue_job_t
Browse files Browse the repository at this point in the history
Both the thread queue and the encoder states hold pointers to the thread
queue jobs. It is possible that a job is removed from the thread queue
and freed while the encoder state is still using it. This commit adds
reference counting to threadqueue_job_t in order to fix the problem.

Fixes #161.
  • Loading branch information
aryla committed Apr 12, 2017
1 parent b41f0fa commit 2991962
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 55 deletions.
20 changes: 16 additions & 4 deletions src/encoder_state-ctors_dtors.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,17 @@ static void encoder_state_config_tile_finalize(encoder_state_t * const state) {
if (state->tile == NULL) return;

if (state->tile->hor_buf_before_sao) kvz_yuv_t_free(state->tile->hor_buf_before_sao);

kvz_yuv_t_free(state->tile->hor_buf_search);
kvz_yuv_t_free(state->tile->ver_buf_search);


if (state->encoder_control->cfg.wpp) {
int num_jobs = state->tile->frame->width_in_lcu * state->tile->frame->height_in_lcu;
for (int i = 0; i < num_jobs; ++i) {
kvz_threadqueue_free_job(&state->tile->wf_jobs[i]);
}
}

kvz_videoframe_free(state->tile->frame);
state->tile->frame = NULL;
if (state->encoder_control->cfg.crypto_features && state->tile->dbs_g) {
Expand All @@ -145,8 +152,10 @@ static void encoder_state_config_tile_finalize(encoder_state_t * const state) {
FREE_POINTER(state->tile->wf_jobs);
}

static int encoder_state_config_slice_init(encoder_state_t * const state,
const int start_address_in_ts, const int end_address_in_ts) {
static int encoder_state_config_slice_init(encoder_state_t * const state,
const int start_address_in_ts,
const int end_address_in_ts)
{
state->slice->id = -1;
for (int i = 0; i < state->encoder_control->slice_count; ++i) {
if (state->encoder_control->slice_addresses_in_ts[i] == start_address_in_ts) {
Expand Down Expand Up @@ -706,4 +715,7 @@ void kvz_encoder_state_finalize(encoder_state_t * const state) {
}

kvz_bitstream_finalize(&state->stream);

kvz_threadqueue_free_job(&state->tqj_recon_done);
kvz_threadqueue_free_job(&state->tqj_bitstream_written);
}
34 changes: 19 additions & 15 deletions src/encoderstate.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "sao.h"
#include "search.h"
#include "tables.h"
#include "threadqueue.h"


int kvz_encoder_state_match_children_of_previous_frame(encoder_state_t * const state) {
Expand Down Expand Up @@ -479,6 +480,7 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) {
#else
char* job_description = NULL;
#endif
kvz_threadqueue_free_job(&state->tile->wf_jobs[lcu->id]);
state->tile->wf_jobs[lcu->id] = kvz_threadqueue_submit(state->encoder_control->threadqueue, encoder_state_worker_encode_lcu, (void*)lcu, 1, job_description);

// If job object was returned, add dependancies and allow it to run.
Expand Down Expand Up @@ -515,13 +517,14 @@ static void encoder_state_encode_leaf(encoder_state_t * const state) {
}

kvz_threadqueue_job_unwait_job(state->encoder_control->threadqueue, state->tile->wf_jobs[lcu->id]);
}

// In the case where SAO is not enabled, the wavefront row is
// done when the last LCU in the row is done.
if (!state->encoder_control->cfg.sao_enable && i + 1 == state->lcu_order_count) {
assert(!state->tqj_recon_done);
state->tqj_recon_done = state->tile->wf_jobs[lcu->id];
// In the case where SAO is not enabled, the wavefront row is
// done when the last LCU in the row is done.
if (!state->encoder_control->cfg.sao_enable && i + 1 == state->lcu_order_count) {
assert(!state->tqj_recon_done);
state->tqj_recon_done =
kvz_threadqueue_copy_ref(state->tile->wf_jobs[lcu->id]);
}
}
}
}
Expand All @@ -541,11 +544,11 @@ static void encoder_state_worker_encode_children(void * opaque)
int wpp_row = sub_state->wfrow->lcu_offset_y;
int tile_width = sub_state->tile->frame->width_in_lcu;
int end_of_row = (wpp_row + 1) * tile_width - 1;
threadqueue_job_t *job = sub_state->tile->wf_jobs[end_of_row];

assert(!sub_state->tqj_bitstream_written);
sub_state->tqj_bitstream_written = job;
return;
if (sub_state->tile->wf_jobs[end_of_row]) {
sub_state->tqj_bitstream_written =
kvz_threadqueue_copy_ref(sub_state->tile->wf_jobs[end_of_row]);
}
}
}

Expand Down Expand Up @@ -674,6 +677,7 @@ static void encoder_state_encode(encoder_state_t * const main_state) {
#else
char* job_description = NULL;
#endif
kvz_threadqueue_free_job(&main_state->children[i].tqj_recon_done);
main_state->children[i].tqj_recon_done = kvz_threadqueue_submit(main_state->encoder_control->threadqueue, encoder_state_worker_encode_children, &(main_state->children[i]), 1, job_description);
if (main_state->children[i].previous_encoder_state != &main_state->children[i] && main_state->children[i].previous_encoder_state->tqj_recon_done && !main_state->children[i].frame->is_idr_frame) {
#if 0
Expand Down Expand Up @@ -703,11 +707,10 @@ static void encoder_state_encode(encoder_state_t * const main_state) {
if (main_state->encoder_control->cfg.sao_enable &&
main_state->children[0].type == ENCODER_STATE_TYPE_WAVEFRONT_ROW)
{
int y;
videoframe_t * const frame = main_state->tile->frame;
threadqueue_job_t *previous_job = NULL;

for (y = 0; y < frame->height_in_lcu; ++y) {
for (int y = 0; y < frame->height_in_lcu; ++y) {
// Queue a single job performing SAO reconstruction for the whole wavefront row.

worker_sao_reconstruct_lcu_data *data = MALLOC(worker_sao_reconstruct_lcu_data, 1);
Expand Down Expand Up @@ -743,13 +746,14 @@ static void encoder_state_encode(encoder_state_t * const main_state) {

// The wavefront row is finished, when the SAO-reconstruction is
// finished.
kvz_threadqueue_free_job(&main_state->children[y].tqj_recon_done);
main_state->children[y].tqj_recon_done = job;

if (y == frame->height_in_lcu - 1) {
// This tile is finished, when the reconstruction of the last
// WPP-row is finished.
assert(!main_state->tqj_recon_done);
main_state->tqj_recon_done = job;
main_state->tqj_recon_done = kvz_threadqueue_copy_ref(job);
}
}
}
Expand Down Expand Up @@ -949,8 +953,8 @@ static void encoder_state_init_children(encoder_state_t * const state) {
}

//Clear the jobs
state->tqj_bitstream_written = NULL;
state->tqj_recon_done = NULL;
kvz_threadqueue_free_job(&state->tqj_bitstream_written);
kvz_threadqueue_free_job(&state->tqj_recon_done);

for (int i = 0; state->children[i].encoder_control; ++i) {
encoder_state_init_children(&state->children[i]);
Expand Down
2 changes: 1 addition & 1 deletion src/kvazaar.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static int kvazaar_encode(kvz_encoder *enc,
kvz_threadqueue_waitfor(enc->control->threadqueue, output_state->tqj_bitstream_written);
// The job pointer must be set to NULL here since it won't be usable after
// the next frame is done.
output_state->tqj_bitstream_written = NULL;
kvz_threadqueue_free_job(&output_state->tqj_bitstream_written);

// Get stream length before taking chunks since that clears the stream.
if (len_out) *len_out = kvz_bitstream_tell(&output_state->stream) / 8;
Expand Down
77 changes: 43 additions & 34 deletions src/threadqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,52 +324,59 @@ int kvz_threadqueue_init(threadqueue_queue_t * const threadqueue, int thread_cou
}

/**
* \brief Free a single job from the threadqueue index i, destroying it.
* \brief Get a new pointer to a job.
*
* Increment reference count and return the job.
*/
static void threadqueue_free_job(threadqueue_queue_t * const threadqueue, int i)
threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job)
{
#ifdef KVZ_DEBUG
#if KVZ_DEBUG & KVZ_PERF_JOB
int j;
KVZ_GET_TIME(&threadqueue->queue[i]->debug_clock_dequeue);
fprintf(threadqueue->debug_log, "%p\t%d\t%lf\t+%lf\t+%lf\t+%lf\t%s\n", threadqueue->queue[i], threadqueue->queue[i]->debug_worker_id, KVZ_CLOCK_T_AS_DOUBLE(threadqueue->queue[i]->debug_clock_enqueue), KVZ_CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_enqueue, threadqueue->queue[i]->debug_clock_start), KVZ_CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_start, threadqueue->queue[i]->debug_clock_stop), KVZ_CLOCK_T_DIFF(threadqueue->queue[i]->debug_clock_stop, threadqueue->queue[i]->debug_clock_dequeue), threadqueue->queue[i]->debug_description);
// The caller should have had another reference.
assert(job->refcount > 0);
KVZ_ATOMIC_INC(&job->refcount);
return job;
}


for (j = 0; j < threadqueue->queue[i]->rdepends_count; ++j) {
fprintf(threadqueue->debug_log, "%p->%p\n", threadqueue->queue[i], threadqueue->queue[i]->rdepends[j]);
/**
* \brief Free a job.
*
* Decrement reference count of the job. If no references exist any more,
* deallocate associated memory and destroy mutexes.
*
* Sets the job pointer to NULL.
*/
void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr)
{
threadqueue_job_t *job = *job_ptr;
if (job == NULL) return;
*job_ptr = NULL;

int new_refcount = KVZ_ATOMIC_DEC(&job->refcount);
if (new_refcount > 0) {
// There are still references so we don't free the data yet.
return;
}

FREE_POINTER(threadqueue->queue[i]->debug_description);
#endif
assert(new_refcount >= 0);

#ifdef KVZ_DEBUG
FREE_POINTER(job->debug_description);
#endif
FREE_POINTER(threadqueue->queue[i]->rdepends);

pthread_mutex_destroy(&threadqueue->queue[i]->lock);

FREE_POINTER(threadqueue->queue[i]);
FREE_POINTER(job->rdepends);
pthread_mutex_destroy(&job->lock);
FREE_POINTER(job);
}

static void threadqueue_free_jobs(threadqueue_queue_t * const threadqueue) {
int i;
for (i=0; i < threadqueue->queue_count; ++i) {
threadqueue_free_job(threadqueue, i);
for (int i = 0; i < threadqueue->queue_count; ++i) {
kvz_threadqueue_free_job(&threadqueue->queue[i]);
}
threadqueue->queue_count = 0;
threadqueue->queue_start = 0;
#ifdef KVZ_DEBUG
#if KVZ_DEBUG & KVZ_PERF_JOB
{
KVZ_CLOCK_T time;
KVZ_GET_TIME(&time);

fprintf(threadqueue->debug_log, "\t\t-\t-\t%lf\t-\tFLUSH\n", KVZ_CLOCK_T_AS_DOUBLE(time));
}
#endif
#endif
}

int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) {
int i;

//Flush the queue
if (!kvz_threadqueue_flush(threadqueue)) {
fprintf(stderr, "Unable to flush threadqueue!\n");
Expand Down Expand Up @@ -406,7 +413,7 @@ int kvz_threadqueue_finalize(threadqueue_queue_t * const threadqueue) {
PTHREAD_UNLOCK(&threadqueue->lock);

//Join threads
for(i = 0; i < threadqueue->threads_count; i++) {
for(int i = 0; i < threadqueue->threads_count; i++) {
if(pthread_join(threadqueue->threads[i], NULL) != 0) {
fprintf(stderr, "pthread_join failed!\n");
return 0;
Expand Down Expand Up @@ -493,10 +500,10 @@ int kvz_threadqueue_waitfor(threadqueue_queue_t * const threadqueue, threadqueue
} while (!job_done);

// Free jobs submitted before this job.
int i;
for (i = 0; i < threadqueue->queue_count; ++i) {
int i = 0;
for (; i < threadqueue->queue_count; ++i) {
if (threadqueue->queue[i] == job) break;
threadqueue_free_job(threadqueue, i);
kvz_threadqueue_free_job(&threadqueue->queue[i]);
}
// Move remaining jobs to the beginning of the array.
if (i > 0) {
Expand Down Expand Up @@ -525,6 +532,7 @@ threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadque
assert(wait == 0 || wait == 1);

job = MALLOC(threadqueue_job_t, 1);
job->refcount = 1;

#ifdef KVZ_DEBUG
if (debug_description) {
Expand Down Expand Up @@ -580,6 +588,7 @@ threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * const threadque
threadqueue->queue_size += THREADQUEUE_LIST_REALLOC_SIZE;
}
threadqueue->queue[threadqueue->queue_count++] = job;
job->refcount++;

if (job->ndepends == 0) {
++threadqueue->queue_waiting_execution;
Expand Down
9 changes: 8 additions & 1 deletion src/threadqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ typedef struct threadqueue_job_t {
struct threadqueue_job_t **rdepends; //array of pointer to jobs that depend on this one. They have to exist when the thread finishes, because they cannot be run before.
unsigned int rdepends_count; //number of rdepends
unsigned int rdepends_size; //allocated size of rdepends


// Reference count
int refcount;

//Job function and state to use
void (*fptr)(void *arg);
void *arg;
Expand Down Expand Up @@ -107,6 +110,10 @@ int kvz_threadqueue_init(threadqueue_queue_t * threadqueue, int thread_count, in
//Add a job to the queue, and returs a threadqueue_job handle. If wait == 1, one has to run kvz_threadqueue_job_unwait_job in order to have it run
threadqueue_job_t * kvz_threadqueue_submit(threadqueue_queue_t * threadqueue, void (*fptr)(void *arg), void *arg, int wait, const char* debug_description);

void kvz_threadqueue_free_job(threadqueue_job_t **job_ptr);

threadqueue_job_t *kvz_threadqueue_copy_ref(threadqueue_job_t *job);

int kvz_threadqueue_job_unwait_job(threadqueue_queue_t * threadqueue, threadqueue_job_t *job);

//Add a dependency between two jobs.
Expand Down

0 comments on commit 2991962

Please sign in to comment.