Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype for creating one timer coro per output thread #29

Open
wants to merge 12 commits into
base: s3-refactor-timer-coros-awspr-sched-exp
Choose a base branch
from
1 change: 0 additions & 1 deletion include/fluent-bit/flb_async_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#endif

#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_async_timer.h>
#include <fluent-bit/flb_thread_pool.h>
#include <fluent-bit/flb_output_thread.h>

Expand Down
5 changes: 4 additions & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/tls/flb_tls.h>
#include <fluent-bit/flb_thread_pool.h>
#include <fluent-bit/flb_output_thread.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_upstream_ha.h>
Expand Down Expand Up @@ -77,6 +76,10 @@

struct flb_output_flush;

struct flb_out_thread_instance;
int flb_output_thread_pool_coros_size(struct flb_output_instance *ins);
struct flb_out_thread_instance *flb_output_thread_instance_get();

/*
* Tests callbacks
* ===============
Expand Down
79 changes: 67 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ static int cb_s3_init(struct flb_output_instance *ins,

pthread_mutex_init(&ctx->upload_queue_mutex, NULL);
pthread_mutex_init(&ctx->cb_flush_mutex, NULL);
pthread_mutex_init(&ctx->create_timer_mutex, NULL);

/* Export context */
flb_output_set_context(ins, ctx);
Expand All @@ -593,6 +594,19 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

// alloc here doesn't work somehow??
if (ctx->ins->is_threaded == FLB_TRUE && ctx->ins->tp_workers > 0) {
ctx->thread_instances = flb_calloc(1,
sizeof(struct flb_out_thread_instance *)
* ctx->ins->tp_workers); // check that its not zero
if (!ctx->thread_instances) {
flb_errno();
return -1;
}
} else {
ctx->thread_instances = NULL;
}

/* the check against -1 is works here because size_t is unsigned
* and (int) -1 == unsigned max value
* Fluent Bit uses -1 (which becomes max value) to indicate undefined
Expand Down Expand Up @@ -958,7 +972,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->provider->provider_vtable->sync(ctx->provider);
ctx->provider->provider_vtable->init(ctx->provider);

ctx->timer_created = FLB_FALSE;
ctx->timers_created = 0;
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) {
ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT;
Expand Down Expand Up @@ -1984,25 +1998,66 @@ static void async_timer_cb(struct flb_config *config, void *data)
cb_s3_upload(config, ctx);
}

static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx)
static void create_timer_on_thread(struct flb_config *config, struct flb_s3 *ctx)
{
struct flb_sched *sched;
struct flb_out_thread_instance *th_ins;
struct flb_out_thread_instance *current_th_ins;
int i;
int ret;

pthread_mutex_lock(&ctx->create_timer_mutex);
sched = flb_sched_ctx_get();
ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, ctx->ins,
S3_UPLOAD_JOB_NAME, async_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create upload timer");
pthread_mutex_unlock(&ctx->create_timer_mutex);
return;
}

ctx->timers_created++;
/* Save the worker thread pointer in the list */
if (ctx->ins->is_threaded == FLB_TRUE) {
current_th_ins = flb_output_thread_instance_get();
for (i = 0; i < ctx->ins->tp_workers; i++) {
th_ins = ctx->thread_instances[i];
if (th_ins == current_th_ins) {
return;
}
if (th_ins == NULL) {
ctx->thread_instances[i] = current_th_ins;
}
}
}
pthread_mutex_unlock(&ctx->create_timer_mutex);
}

static void s3_flush_init(struct flb_config *config, struct flb_s3 *ctx)
{
struct flb_out_thread_instance *current_th_ins;
struct flb_out_thread_instance *th_ins;
int i;

flush_startup_chunks(ctx);

if (ctx->timer_created == FLB_FALSE) {
sched = flb_sched_ctx_get();
if (ctx->timers_created == 0 ) {
create_timer_on_thread(config, ctx);
}
if (ctx->timers_created < ctx->ins->tp_workers && ctx->ins->is_threaded == FLB_TRUE) {
/* Check if current worker thread has a timer scheduled on its evl */
current_th_ins = flb_output_thread_instance_get();

for (i = 0; i < ctx->ins->tp_workers; i++) {
th_ins = ctx->thread_instances[i];

ret = flb_sched_out_async_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, ctx->ins,
S3_UPLOAD_JOB_NAME, async_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create upload timer");
return;
if (th_ins != NULL && th_ins == current_th_ins) {
return;
}
}
ctx->timer_created = FLB_TRUE;
create_timer_on_thread(config, ctx);
}
}

Expand Down
11 changes: 10 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ struct flb_s3 {
size_t upload_chunk_size;
time_t upload_timeout;

int timer_created;
int timer_ms;
int key_fmt_has_uuid;

Expand All @@ -172,6 +171,16 @@ struct flb_s3 {
*/
pthread_mutex_t cb_flush_mutex;

/*
* Need to create a timer on each worker thread. Store a
* array of pointers to the thread instance with a mutex to
* protect the array and timers_created counter.
*/
pthread_mutex_t create_timer_mutex;
struct flb_out_thread_instance *thread_instances[5];
struct flb_out_thread_instance **thread_instances;
int timers_created;

struct flb_output_instance *ins;
};

Expand Down
3 changes: 1 addition & 2 deletions src/flb_async_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <fluent-bit/flb_coro.h>
#include <fluent-bit/flb_thread_pool.h>
#include <fluent-bit/flb_output_thread.h>
#include <fluent-bit/flb_async_timer.h>
#include <fluent-bit/flb_scheduler.h>

Expand Down Expand Up @@ -167,7 +166,7 @@ void flb_thread_pool_async_timers_print(struct flb_output_instance *ins)

th_ins = th->params.data;
pthread_mutex_lock(&th_ins->sched->async_timer_mutex);
flb_async_timers_print(&th_ins->sched->async_timer_list);
flb_async_timers_print(th_ins->sched);
pthread_mutex_unlock(&th_ins->sched->async_timer_mutex);
}
}
2 changes: 1 addition & 1 deletion src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ int flb_engine_start(struct flb_config *config)
flb_net_dns_lookup_context_cleanup(&dns_ctx);
flb_sched_timer_cleanup(config->sched);
flb_upstream_conn_pending_destroy_list(&config->upstreams);
flb_async_timer_cleanup(config->sched->async_timer_list_destroy);
flb_async_timer_cleanup(config->sched);

/*
* depend on main thread to clean up expired message
Expand Down
3 changes: 1 addition & 2 deletions src/flb_output_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ static void output_thread(void *data)
/* Destroy upstream connections from the 'pending destroy list' */
flb_upstream_conn_pending_destroy_list(&th_ins->upstreams);
flb_sched_timer_cleanup(sched);
flb_async_timer_cleanup(&th_ins->sched);

/* Check if we should stop the event loop */
if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->sched->async_timer_list) == 0) {
Expand All @@ -360,7 +359,7 @@ static void output_thread(void *data)
upstream_thread_destroy(th_ins);
flb_upstream_conn_active_destroy_list(&th_ins->upstreams);
flb_upstream_conn_pending_destroy_list(&th_ins->upstreams);
flb_async_timer_cleanup(&th_ins->sched);
flb_async_timer_cleanup(th_ins->sched);

flb_sched_destroy(sched);
flush_params = FLB_TLS_GET(out_flush_params);
Expand Down
Loading