Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 102 additions & 8 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ pthread_once_t initialization_guard = PTHREAD_ONCE_INIT;

FLB_TLS_DEFINE(struct flb_loki_dynamic_tenant_id_entry,
thread_local_tenant_id);
struct flb_loki_remove_mpa_entry {
struct flb_mp_accessor *mpa;
struct cfl_list _head;
};
FLB_TLS_DEFINE(struct flb_loki_remove_mpa_entry, thread_local_remove_mpa);

void initialize_thread_local_storage()
{
FLB_TLS_INIT(thread_local_tenant_id);
FLB_TLS_INIT(thread_local_remove_mpa);
}

static struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id_create() {
Expand Down Expand Up @@ -81,6 +87,43 @@ static void dynamic_tenant_id_destroy(struct flb_loki_dynamic_tenant_id_entry *e
}
}

static struct flb_loki_remove_mpa_entry *remove_mpa_entry_create(struct flb_loki *ctx)
{
struct flb_loki_remove_mpa_entry *entry;

entry = flb_calloc(1, sizeof(struct flb_loki_remove_mpa_entry));
if (!entry) {
flb_errno();
return NULL;
}

entry->mpa = flb_mp_accessor_create(&ctx->remove_keys_derived);
if (!entry->mpa) {
flb_free(entry);
return NULL;
}

cfl_list_entry_init(&entry->_head);

return entry;
}

static void remove_mpa_entry_destroy(struct flb_loki_remove_mpa_entry *entry)
{
if (entry) {
if (entry->mpa) {
flb_mp_accessor_destroy(entry->mpa);
entry->mpa = NULL;
}

if (!cfl_list_entry_is_orphan(&entry->_head)) {
cfl_list_del(&entry->_head);
}

flb_free(entry);
}
}

static void flb_loki_kv_init(struct mk_list *list)
{
mk_list_init(list);
Expand Down Expand Up @@ -1371,7 +1414,8 @@ static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map,

static int pack_record(struct flb_loki *ctx,
msgpack_packer *mp_pck, msgpack_object *rec,
flb_sds_t *dynamic_tenant_id)
flb_sds_t *dynamic_tenant_id,
struct flb_mp_accessor *remove_mpa)
{
int i;
int skip = 0;
Expand All @@ -1397,8 +1441,8 @@ static int pack_record(struct flb_loki *ctx,

/* Remove keys in remove_keys */
msgpack_unpacked_init(&mp_buffer);
if (ctx->remove_mpa) {
ret = flb_mp_accessor_keys_remove(ctx->remove_mpa, rec,
if (remove_mpa) {
ret = flb_mp_accessor_keys_remove(remove_mpa, rec,
(void *) &tmp_sbuf_data, &tmp_sbuf_size);
if (ret == FLB_TRUE) {
ret = msgpack_unpack_next(&mp_buffer, tmp_sbuf_data, tmp_sbuf_size, &off);
Expand Down Expand Up @@ -1564,6 +1608,15 @@ static int cb_loki_init(struct flb_output_instance *ins,
}

cfl_list_init(&ctx->dynamic_tenant_list);
result = pthread_mutex_init(&ctx->remove_mpa_list_lock, NULL);
if (result != 0) {
flb_errno();
flb_plg_error(ins, "cannot initialize remove_mpa list lock");
loki_config_destroy(ctx);
return -1;
}

cfl_list_init(&ctx->remove_mpa_list);

/*
* This plugin instance uses the HTTP client interface, let's register
Expand All @@ -1581,7 +1634,8 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
int total_records,
char *tag, int tag_len,
const void *data, size_t bytes,
flb_sds_t *dynamic_tenant_id)
flb_sds_t *dynamic_tenant_id,
struct flb_mp_accessor *remove_mpa)
{
// int mp_ok = MSGPACK_UNPACK_SUCCESS;
// size_t off = 0;
Expand Down Expand Up @@ -1672,7 +1726,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa);
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
Expand Down Expand Up @@ -1709,7 +1763,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa);
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
Expand Down Expand Up @@ -1752,13 +1806,30 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk,
struct flb_connection *u_conn;
struct flb_http_client *c;
struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id;
struct flb_loki_remove_mpa_entry *remove_mpa_entry;
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *key = NULL;
struct flb_slist_entry *val = NULL;

dynamic_tenant_id = FLB_TLS_GET(thread_local_tenant_id);

remove_mpa_entry = FLB_TLS_GET(thread_local_remove_mpa);

if (remove_mpa_entry == NULL) {
remove_mpa_entry = remove_mpa_entry_create(ctx);
if (!remove_mpa_entry) {
flb_plg_error(ctx->ins, "cannot allocate remove_mpa entry");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

FLB_TLS_SET(thread_local_remove_mpa, remove_mpa_entry);

pthread_mutex_lock(&ctx->remove_mpa_list_lock);
cfl_list_add(&remove_mpa_entry->_head, &ctx->remove_mpa_list);
pthread_mutex_unlock(&ctx->remove_mpa_list_lock);
}

if (dynamic_tenant_id == NULL) {
dynamic_tenant_id = dynamic_tenant_id_create();

Expand All @@ -1784,7 +1855,8 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk,
(char *) event_chunk->tag,
flb_sds_len(event_chunk->tag),
event_chunk->data, event_chunk->size,
&dynamic_tenant_id->value);
&dynamic_tenant_id->value,
remove_mpa_entry->mpa);

if (!payload) {
flb_plg_error(ctx->ins, "cannot compose request payload");
Expand Down Expand Up @@ -1982,6 +2054,21 @@ static void release_dynamic_tenant_ids(struct cfl_list *dynamic_tenant_list)
}
}

static void release_remove_mpa_entries(struct cfl_list *remove_mpa_list)
{
struct cfl_list *iterator;
struct cfl_list *backup;
struct flb_loki_remove_mpa_entry *entry;

cfl_list_foreach_safe(iterator, backup, remove_mpa_list) {
entry = cfl_list_entry(iterator,
struct flb_loki_remove_mpa_entry,
_head);

remove_mpa_entry_destroy(entry);
}
}

static int cb_loki_exit(void *data, struct flb_config *config)
{
struct flb_loki *ctx = data;
Expand All @@ -1996,6 +2083,12 @@ static int cb_loki_exit(void *data, struct flb_config *config)

pthread_mutex_unlock(&ctx->dynamic_tenant_list_lock);

pthread_mutex_lock(&ctx->remove_mpa_list_lock);

release_remove_mpa_entries(&ctx->remove_mpa_list);

pthread_mutex_unlock(&ctx->remove_mpa_list_lock);

loki_config_destroy(ctx);

return 0;
Expand Down Expand Up @@ -2147,7 +2240,8 @@ static int cb_loki_format_test(struct flb_config *config,

payload = loki_compose_payload(ctx, total_records,
(char *) tag, tag_len, data, bytes,
&dynamic_tenant_id);
&dynamic_tenant_id,
ctx->remove_mpa);
if (payload == NULL) {
if (dynamic_tenant_id != NULL) {
flb_sds_destroy(dynamic_tenant_id);
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct flb_loki {
struct cfl_list dynamic_tenant_list;
pthread_mutex_t dynamic_tenant_list_lock;

struct cfl_list remove_mpa_list;
pthread_mutex_t remove_mpa_list_lock;

/* Upstream Context */
struct flb_upstream *u;

Expand Down
47 changes: 47 additions & 0 deletions tests/runtime/out_loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,52 @@ void flb_test_remove_keys()
flb_destroy(ctx);
}

void flb_test_remove_keys_workers()
{
int ret;
int i;
int size = sizeof(JSON_LABEL_KEYS) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1",
"log_level", "error",
NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Loki output with multiple workers */
out_ffd = flb_output(ctx, (char *) "loki", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
"remove_keys", "foo, $data['l_key']",
"workers", "2",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_remove_keys,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest multiple data samples */
for (i = 0; i < 10; i++) {
flb_lib_push(ctx, in_ffd, (char *) JSON_LABEL_KEYS, size);
}

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

static void cb_check_label_map_path(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
Expand Down Expand Up @@ -972,6 +1018,7 @@ TEST_LIST = {
{"remove_keys_remove_map" , flb_test_remove_map},
{"labels_ra" , flb_test_labels_ra },
{"remove_keys" , flb_test_remove_keys },
{"remove_keys_workers" , flb_test_remove_keys_workers },
{"basic" , flb_test_basic },
{"labels" , flb_test_labels },
{"label_keys" , flb_test_label_keys },
Expand Down
Loading