diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 4e653e7e349..b9b8b13cf15 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -36,6 +36,8 @@ #include "es.h" #include "es_conf.h" #include "es_bulk.h" +#include "fluent-bit/flb_log.h" +#include "fluent-bit/flb_sds.h" #include "murmur3.h" struct flb_output_plugin out_es_plugin; @@ -885,6 +887,29 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, else if (ctx->cloud_user && ctx->cloud_passwd) { flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd); } + else if (ctx->http_api_key) { + flb_sds_t header_buffer; + flb_sds_t header_line; + + header_buffer = flb_sds_create_size(strlen(ctx->http_api_key) + 64); + + if (header_buffer == NULL) { + flb_error("[out_es] failed to create header buffer"); + } else + { + header_line = flb_sds_printf(&header_buffer, "ApiKey %s", ctx->http_api_key); + + if (header_line != NULL) { + flb_http_add_header(c, + "Authorization", + strlen("Authorization"), + header_line, + flb_sds_len(header_line)); + } + + flb_sds_destroy(header_buffer); + } + } #ifdef FLB_HAVE_AWS if (ctx->has_aws_auth == FLB_TRUE) { @@ -1288,7 +1313,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), "When enabled print the Elasticsearch exception to stderr (for diag only)" }, - + [35]={ + .type=FLB_CONFIG_MAP_STR, "http_api_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_api_key), + "API key for Elasticsearch" + }, /* EOF */ {0} }; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index b6512ebc2a0..04e603c8e59 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -54,6 +54,7 @@ struct flb_elasticsearch { /* HTTP Auth */ char *http_user; char *http_passwd; + char *http_api_key; /* Elastic Cloud Auth */ char *cloud_user; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 4bc2977c5eb..a57dea1b1e5 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -143,6 +143,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, #endif char *cloud_port_char; char *cloud_host = NULL; + char *http_api_key = NULL; int cloud_host_port = 0; int cloud_port = FLB_ES_DEFAULT_HTTPS_PORT; struct flb_uri *uri = ins->host.uri; diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 3b67d56d432..badabc6056f 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -7,6 +7,24 @@ #include "data/es/json_es.h" /* JSON_ES */ +static void cb_check_http_api_key(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + char *p; + char *out_js = res_data; + char *auth_header = data; + + /* Check if the Authorization header is properly set */ + TEST_CHECK(res_data != NULL); + + /* Print test debugging info */ + flb_debug("[api key test] %s, data: %s", auth_header, out_js); + + flb_free(res_data); +} + + static void cb_check_write_op_index(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -722,6 +740,51 @@ void flb_test_div0() flb_destroy(ctx); } +void flb_test_http_api_key() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char *api_key = "my-api-key-for-elasticsearch"; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Configure http_api_key */ + flb_output_set(ctx, out_ffd, + "http_api_key", api_key, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_http_api_key, + api_key, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + static void cb_check_long_index(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, @@ -1012,6 +1075,7 @@ TEST_LIST = { {"tag_key" , flb_test_tag_key }, {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, + {"http_api_key" , flb_test_http_api_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, {"response_success" , flb_test_response_success }, {"response_successes", flb_test_response_successes },