diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index 2bdcaba3539..ee7a4313a89 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -44,10 +44,16 @@ pthread_once_t initialization_guard = PTHREAD_ONCE_INIT; FLB_TLS_DEFINE(struct flb_loki_dynamic_tenant_id_entry, thread_local_tenant_id); +struct flb_loki_remove_mpa_entry { + struct flb_mp_accessor *mpa; + struct cfl_list _head; +}; +FLB_TLS_DEFINE(struct flb_loki_remove_mpa_entry, thread_local_remove_mpa); void initialize_thread_local_storage() { FLB_TLS_INIT(thread_local_tenant_id); + FLB_TLS_INIT(thread_local_remove_mpa); } static struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id_create() { @@ -81,6 +87,43 @@ static void dynamic_tenant_id_destroy(struct flb_loki_dynamic_tenant_id_entry *e } } +static struct flb_loki_remove_mpa_entry *remove_mpa_entry_create(struct flb_loki *ctx) +{ + struct flb_loki_remove_mpa_entry *entry; + + entry = flb_calloc(1, sizeof(struct flb_loki_remove_mpa_entry)); + if (!entry) { + flb_errno(); + return NULL; + } + + entry->mpa = flb_mp_accessor_create(&ctx->remove_keys_derived); + if (!entry->mpa) { + flb_free(entry); + return NULL; + } + + cfl_list_entry_init(&entry->_head); + + return entry; +} + +static void remove_mpa_entry_destroy(struct flb_loki_remove_mpa_entry *entry) +{ + if (entry) { + if (entry->mpa) { + flb_mp_accessor_destroy(entry->mpa); + entry->mpa = NULL; + } + + if (!cfl_list_entry_is_orphan(&entry->_head)) { + cfl_list_del(&entry->_head); + } + + flb_free(entry); + } +} + static void flb_loki_kv_init(struct mk_list *list) { mk_list_init(list); @@ -1371,7 +1414,8 @@ static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map, static int pack_record(struct flb_loki *ctx, msgpack_packer *mp_pck, msgpack_object *rec, - flb_sds_t *dynamic_tenant_id) + flb_sds_t *dynamic_tenant_id, + struct flb_mp_accessor *remove_mpa) { int i; int skip = 0; @@ -1397,8 +1441,8 @@ static int pack_record(struct flb_loki *ctx, /* Remove keys in remove_keys */ msgpack_unpacked_init(&mp_buffer); - if (ctx->remove_mpa) { - ret = flb_mp_accessor_keys_remove(ctx->remove_mpa, rec, + if (remove_mpa) { + ret = flb_mp_accessor_keys_remove(remove_mpa, rec, (void *) &tmp_sbuf_data, &tmp_sbuf_size); if (ret == FLB_TRUE) { ret = msgpack_unpack_next(&mp_buffer, tmp_sbuf_data, tmp_sbuf_size, &off); @@ -1564,6 +1608,15 @@ static int cb_loki_init(struct flb_output_instance *ins, } cfl_list_init(&ctx->dynamic_tenant_list); + result = pthread_mutex_init(&ctx->remove_mpa_list_lock, NULL); + if (result != 0) { + flb_errno(); + flb_plg_error(ins, "cannot initialize remove_mpa list lock"); + loki_config_destroy(ctx); + return -1; + } + + cfl_list_init(&ctx->remove_mpa_list); /* * This plugin instance uses the HTTP client interface, let's register @@ -1581,7 +1634,8 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, int total_records, char *tag, int tag_len, const void *data, size_t bytes, - flb_sds_t *dynamic_tenant_id) + flb_sds_t *dynamic_tenant_id, + struct flb_mp_accessor *remove_mpa) { // int mp_ok = MSGPACK_UNPACK_SUCCESS; // size_t off = 0; @@ -1672,7 +1726,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); - pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); + pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa); if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL); } @@ -1709,7 +1763,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); - pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); + pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa); if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body); } @@ -1752,6 +1806,7 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, struct flb_connection *u_conn; struct flb_http_client *c; struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id; + struct flb_loki_remove_mpa_entry *remove_mpa_entry; struct mk_list *head; struct flb_config_map_val *mv; struct flb_slist_entry *key = NULL; @@ -1759,6 +1814,22 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, dynamic_tenant_id = FLB_TLS_GET(thread_local_tenant_id); + remove_mpa_entry = FLB_TLS_GET(thread_local_remove_mpa); + + if (remove_mpa_entry == NULL) { + remove_mpa_entry = remove_mpa_entry_create(ctx); + if (!remove_mpa_entry) { + flb_plg_error(ctx->ins, "cannot allocate remove_mpa entry"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + FLB_TLS_SET(thread_local_remove_mpa, remove_mpa_entry); + + pthread_mutex_lock(&ctx->remove_mpa_list_lock); + cfl_list_add(&remove_mpa_entry->_head, &ctx->remove_mpa_list); + pthread_mutex_unlock(&ctx->remove_mpa_list_lock); + } + if (dynamic_tenant_id == NULL) { dynamic_tenant_id = dynamic_tenant_id_create(); @@ -1784,7 +1855,8 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, (char *) event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data, event_chunk->size, - &dynamic_tenant_id->value); + &dynamic_tenant_id->value, + remove_mpa_entry->mpa); if (!payload) { flb_plg_error(ctx->ins, "cannot compose request payload"); @@ -1982,6 +2054,21 @@ static void release_dynamic_tenant_ids(struct cfl_list *dynamic_tenant_list) } } +static void release_remove_mpa_entries(struct cfl_list *remove_mpa_list) +{ + struct cfl_list *iterator; + struct cfl_list *backup; + struct flb_loki_remove_mpa_entry *entry; + + cfl_list_foreach_safe(iterator, backup, remove_mpa_list) { + entry = cfl_list_entry(iterator, + struct flb_loki_remove_mpa_entry, + _head); + + remove_mpa_entry_destroy(entry); + } +} + static int cb_loki_exit(void *data, struct flb_config *config) { struct flb_loki *ctx = data; @@ -1996,6 +2083,12 @@ static int cb_loki_exit(void *data, struct flb_config *config) pthread_mutex_unlock(&ctx->dynamic_tenant_list_lock); + pthread_mutex_lock(&ctx->remove_mpa_list_lock); + + release_remove_mpa_entries(&ctx->remove_mpa_list); + + pthread_mutex_unlock(&ctx->remove_mpa_list_lock); + loki_config_destroy(ctx); return 0; @@ -2147,7 +2240,8 @@ static int cb_loki_format_test(struct flb_config *config, payload = loki_compose_payload(ctx, total_records, (char *) tag, tag_len, data, bytes, - &dynamic_tenant_id); + &dynamic_tenant_id, + ctx->remove_mpa); if (payload == NULL) { if (dynamic_tenant_id != NULL) { flb_sds_destroy(dynamic_tenant_id); diff --git a/plugins/out_loki/loki.h b/plugins/out_loki/loki.h index 089859aca2a..68f3821be14 100644 --- a/plugins/out_loki/loki.h +++ b/plugins/out_loki/loki.h @@ -100,6 +100,9 @@ struct flb_loki { struct cfl_list dynamic_tenant_list; pthread_mutex_t dynamic_tenant_list_lock; + struct cfl_list remove_mpa_list; + pthread_mutex_t remove_mpa_list_lock; + /* Upstream Context */ struct flb_upstream *u; diff --git a/tests/runtime/out_loki.c b/tests/runtime/out_loki.c index a440fc0b692..e177b01e675 100644 --- a/tests/runtime/out_loki.c +++ b/tests/runtime/out_loki.c @@ -644,6 +644,52 @@ void flb_test_remove_keys() flb_destroy(ctx); } +void flb_test_remove_keys_workers() +{ + int ret; + int i; + int size = sizeof(JSON_LABEL_KEYS) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Loki output with multiple workers */ + out_ffd = flb_output(ctx, (char *) "loki", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "remove_keys", "foo, $data['l_key']", + "workers", "2", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_remove_keys, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest multiple data samples */ + for (i = 0; i < 10; i++) { + flb_lib_push(ctx, in_ffd, (char *) JSON_LABEL_KEYS, size); + } + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + static void cb_check_label_map_path(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -972,6 +1018,7 @@ TEST_LIST = { {"remove_keys_remove_map" , flb_test_remove_map}, {"labels_ra" , flb_test_labels_ra }, {"remove_keys" , flb_test_remove_keys }, + {"remove_keys_workers" , flb_test_remove_keys_workers }, {"basic" , flb_test_basic }, {"labels" , flb_test_labels }, {"label_keys" , flb_test_label_keys },