Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: minimal synchronous scheduler #6339

Merged
merged 2 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
#define FLB_OUTPUT_NET 32 /* output address may set host and port */
#define FLB_OUTPUT_PLUGIN_CORE 0
#define FLB_OUTPUT_PLUGIN_PROXY 1
#define FLB_OUTPUT_NO_MULTIPLEX 512
#define FLB_OUTPUT_NO_MULTIPLEX 512 /* run one task at a time, one task per flush */
#define FLB_OUTPUT_PRIVATE 1024
#define FLB_OUTPUT_SYNCHRONOUS 2048 /* run one task at a time, no flush cycle limit */


/* Event type handlers */
Expand Down Expand Up @@ -357,6 +358,7 @@ struct flb_output_instance {
* loaded (in backlog)
*/
size_t fs_backlog_chunks_size;

/*
* Buffer limit: optional limit set by configuration so this output instance
* cannot buffer more than total_limit_size (bytes unit).
Expand All @@ -367,6 +369,9 @@ struct flb_output_instance {
*/
size_t total_limit_size;

/* Queue for singleplexed tasks */
struct flb_task_queue *singleplex_queue;

/* Thread Pool: this is optional for the caller */
int tp_workers;
struct flb_tp *tp;
Expand Down Expand Up @@ -721,6 +726,12 @@ struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
int out_id);
int flb_output_flush_finished(struct flb_config *config, int out_id);

int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config);
int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue);
struct flb_output_instance *flb_output_new(struct flb_config *config,
const char *output, void *data,
int public_only);
Expand Down
30 changes: 30 additions & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ struct flb_task {
struct flb_config *config; /* parent flb config */
};

/*
* A queue of flb_task_enqueued tasks
*
* This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
* is used.
*/
struct flb_task_queue {
struct mk_list pending;
struct mk_list in_progress;
};

/*
* An enqueued task is a task that is not yet dispatched to a thread
* or started on the engine.
*
* There may be multiple enqueued instances of the same task on different out instances.
*
* This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
* is used.
*/
struct flb_task_enqueued {
struct flb_task *task;
struct flb_task_retry *retry;
struct flb_output_instance *out_instance;
struct flb_config *config;
struct mk_list _head;
};

int flb_task_running_count(struct flb_config *config);
int flb_task_running_print(struct flb_config *config);

Expand All @@ -104,6 +132,8 @@ void flb_task_add_coro(struct flb_task *task, struct flb_coro *coro);

void flb_task_destroy(struct flb_task *task, int del);

struct flb_task_queue* flb_task_queue_create();
void flb_task_queue_destroy(struct flb_task_queue *queue);
struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
struct flb_output_instance *ins);

Expand Down
14 changes: 6 additions & 8 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
goto error;
}

/*
* Remove async flag from upstream
* CW output runs in sync mode; because the CW API currently requires
* PutLogEvents requests to a log stream to be made serially
*/
upstream->flags &= ~(FLB_IO_ASYNC);

ctx->cw_client->upstream = upstream;
flb_output_upstream_set(upstream, ctx->ins);
ctx->cw_client->host = ctx->endpoint;
Expand Down Expand Up @@ -666,7 +659,12 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_init = cb_cloudwatch_init,
.cb_flush = cb_cloudwatch_flush,
.cb_exit = cb_cloudwatch_exit,
.flags = 0,

/*
* Allow cloudwatch to use async network stack synchronously by opting into
* FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
*/
.flags = FLB_OUTPUT_SYNCHRONOUS,
.workers = 1,

/* Configuration */
Expand Down
7 changes: 7 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
}
name = (char *) flb_output_name(ins);

/* If we are in synchronous mode, flush the next waiting task */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
if (ret == FLB_OK || ret == FLB_RETRY || ret == FLB_ERROR) {
flb_output_task_singleplex_flush_next(ins->singleplex_queue);
}
}

/* A task has finished, delete it */
if (ret == FLB_OK) {
/* cmetrics */
Expand Down
38 changes: 31 additions & 7 deletions src/flb_engine_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,25 @@ int flb_engine_dispatch_retry(struct flb_task_retry *retry,
flb_event_chunk_update(task->event_chunk, buf_data, buf_size);

/* flush the task */
ret = flb_output_task_flush(task, retry->o_ins, config);
Copy link
Contributor Author

@matthewfala matthewfala Nov 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to discuss how to handle retry properly in the enqueued case. Can we hold onto the retry and delete it if the flush fails later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added retry clean up to the singleplex code. Automatically cleans up the associated retry if flush fails, whenever it is called.

if (ret == -1) {
flb_task_retry_destroy(retry);
return -1;
if (retry->o_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
/*
* If the plugin doesn't allow for multiplexing.
* singleplex_enqueue deletes retry context on flush or delayed flush failure
*/
ret = flb_output_task_singleplex_enqueue(retry->o_ins->singleplex_queue, retry,
task, retry->o_ins, config);
if (ret == -1) {
return -1;
}
}
else {
ret = flb_output_task_flush(task, retry->o_ins, config);
if (ret == -1) {
flb_task_retry_destroy(retry);
return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -183,10 +197,20 @@ static int tasks_start(struct flb_input_instance *in,
hits++;

/*
* We have the Task and the Route, created a thread context for the
* data handling.
* If the plugin is in synchronous mode, enqueue the task and flush
* when appropriate.
*/
flb_output_task_flush(task, route->out, config);
if (out->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_enqueue(route->out->singleplex_queue, NULL,
task, route->out, config);
}
else {
/*
* We have the Task and the Route, created a thread context for the
* data handling.
*/
flb_output_task_flush(task, route->out, config);
}

/*
th = flb_output_thread(task,
Expand Down
157 changes: 157 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,125 @@ void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
mk_list_add(&out_flush->_head, &ins->flush_list);
}

/*
* Queue a task to be flushed at a later time
* Deletes retry context if enqueue fails
*/
static int flb_output_task_queue_enqueue(struct flb_task_queue *queue,
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config)
{
struct flb_task_enqueued *queued_task;

queued_task = flb_malloc(sizeof(struct flb_task_enqueued));
if (!queued_task) {
flb_errno();
if (retry) {
flb_task_retry_destroy(retry);
}
return -1;
}
queued_task->retry = retry;
queued_task->out_instance = out_ins;
queued_task->task = task;
queued_task->config = config;

mk_list_add(&queued_task->_head, &queue->pending);
matthewfala marked this conversation as resolved.
Show resolved Hide resolved
return 0;
}

/*
* Pop task from pending queue and flush it
* Will delete retry context if flush fails
*/
static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
{
struct flb_task_enqueued *queued_task;
int ret;
int is_empty;

is_empty = mk_list_is_empty(&queue->pending) == 0;
if (is_empty) {
flb_error("Attempting to flush task from an empty in_progress queue");
return -1;
}

queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
mk_list_del(&queued_task->_head);
mk_list_add(&queued_task->_head, &queue->in_progress);
ret = flb_output_task_flush(queued_task->task,
queued_task->out_instance,
queued_task->config);

/* Destroy retry context if needed */
if (ret == -1) {
if (queued_task->retry) {
flb_task_retry_destroy(queued_task->retry);
}
/* Flush the next task */
flb_output_task_singleplex_flush_next(queue);
return -1;
}

return ret;
}

/*
* Will either run or queue running a single task
* Deletes retry context if enqueue fails
*/
int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
matthewfala marked this conversation as resolved.
Show resolved Hide resolved
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config)
{
int ret;
int is_empty;

/* Enqueue task */
ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
if (ret == -1) {
return -1;
}

/* Launch task if nothing is running */
is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0;
if (is_empty) {
return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
}

return 0;
}

/*
* Clear in progress task and flush a single queued task if exists
* Deletes retry context on next flush if flush fails
*/
int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
{
int is_empty;
struct flb_task_enqueued *ended_task;

/* Remove in progress task */
is_empty = mk_list_is_empty(&queue->in_progress) == 0;
if (!is_empty) {
ended_task = mk_list_entry_first(&queue->in_progress,
matthewfala marked this conversation as resolved.
Show resolved Hide resolved
struct flb_task_enqueued, _head);
mk_list_del(&ended_task->_head);
flb_free(ended_task);
}

/* Flush if there is a pending task queued */
is_empty = mk_list_is_empty(&queue->pending) == 0;
if (!is_empty) {
return flb_output_task_queue_flush_one(queue);
}
return 0;
}

/*
* Flush a task through the output plugin, either using a worker thread + coroutine
* or a simple co-routine in the current thread.
Expand All @@ -191,6 +310,11 @@ int flb_output_task_flush(struct flb_task *task,
ret = flb_output_thread_pool_flush(task, out_ins, config);
if (ret == -1) {
flb_task_users_dec(task, FLB_FALSE);

/* If we are in synchronous mode, flush one waiting task */
if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
}
}
}
else {
Expand All @@ -208,6 +332,14 @@ int flb_output_task_flush(struct flb_task *task,
sizeof(struct flb_output_flush*));
if (ret == -1) {
flb_errno();
flb_output_flush_destroy(out_flush);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a missing error handling code right?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look like flb_task_users_dec(...) should be called to decrease the task counter. It is harder to tell if we need to call flb_output_flush_destroy(...) without working out a call map to see when/where this would be cleaned up under normal/exceptional cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where it is normally called which is in turn called each time a task finishes.

fluent-bit/src/flb_output.c

Lines 501 to 534 in d7a30d1

/*
* Invoked everytime a flush callback has finished (returned). This function
* is called from the event loop.
*/
int flb_output_flush_finished(struct flb_config *config, int out_id)
{
struct mk_list *tmp;
struct mk_list *head;
struct mk_list *list;
struct flb_output_instance *ins;
struct flb_output_flush *out_flush;
struct flb_out_thread_instance *th_ins;
ins = flb_output_get_instance(config, out_id);
if (!ins) {
return -1;
}
if (flb_output_is_threaded(ins) == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
list = &th_ins->flush_list_destroy;
}
else {
list = &ins->flush_list_destroy;
}
/* Look for output coroutines that needs to be destroyed */
mk_list_foreach_safe(head, tmp, list) {
out_flush = mk_list_entry(head, struct flb_output_flush, _head);
flb_output_flush_destroy(out_flush);
}
return 0;
}

Since this task is not started it will never finish, meaning that this flush would never get cleaned up unless we call flb_output_flush_destroy here.

flb_task_users_dec(task, FLB_FALSE);

/* If we are in synchronous mode, flush one waiting task */
if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
}

return -1;
}
}
Expand Down Expand Up @@ -285,6 +417,11 @@ int flb_output_instance_destroy(struct flb_output_instance *ins)
/* release properties */
flb_output_free_properties(ins);

/* free singleplex queue */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(ins->singleplex_queue);
}

mk_list_del(&ins->_head);
flb_free(ins);

Expand Down Expand Up @@ -461,6 +598,9 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
instance->p = plugin;
instance->callback = flb_callback_create(instance->name);
if (!instance->callback) {
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
Expand All @@ -474,6 +614,9 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
Expand Down Expand Up @@ -527,11 +670,25 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
if (plugin->flags & FLB_OUTPUT_NET) {
ret = flb_net_host_set(plugin->name, &instance->host, output);
if (ret != 0) {
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
}

/* Create singleplex queue if SYNCHRONOUS mode is used */
instance->singleplex_queue = NULL;
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
instance->singleplex_queue = flb_task_queue_create();
if (!instance->singleplex_queue) {
flb_free(instance);
flb_errno();
return NULL;
}
}

flb_kv_init(&instance->properties);
flb_kv_init(&instance->net_properties);
mk_list_init(&instance->upstreams);
Expand Down
Loading