diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 476e528cb83..95fa5cd3e11 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -100,6 +100,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, char *dynamic_topic; char *message_key = NULL; size_t message_key_len = 0; + flb_sds_t raw_key = NULL; struct flb_kafka_topic *topic = NULL; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -211,6 +212,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } } + /* Lookup raw_log_key */ + if (ctx->raw_log_key && ctx->format == FLB_KAFKA_FMT_RAW && !raw_key && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->raw_log_key_len && + strncmp(key.via.str.ptr, ctx->raw_log_key, ctx->raw_log_key_len) == 0) { + raw_key = flb_sds_create_len(val.via.str.ptr, val.via.str.size); + } + } + /* Lookup key/topic */ if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) { if (key.via.str.size == ctx->topic_key_len && @@ -346,6 +355,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } #endif + else if (ctx->format == FLB_KAFKA_FMT_RAW) { + if (raw_key == NULL) { + flb_plg_error(ctx->ins, "missing raw_log_key"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = raw_key; + out_size = flb_sds_len(raw_key); + } if (!message_key) { message_key = ctx->message_key; @@ -363,6 +381,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); return FLB_ERROR; } @@ -384,6 +403,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); /* * Unblock the flush requests so that the * engine could try sending data again. @@ -455,6 +475,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; @@ -643,6 +664,13 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the kafka group_id." }, + { + FLB_CONFIG_MAP_STR, "raw_log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, raw_log_key), + "By default, the whole log record will be sent to Kafka. " + "If you specify a key name with this option, then only the value of " + "that key will be sent to Kafka." + }, /* EOF */ {0} }; diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index 04e91255635..0caea60d06d 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -93,6 +93,9 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, ctx->format = FLB_KAFKA_FMT_AVRO; } #endif + else if (strcasecmp(ctx->format_str, "raw") == 0) { + ctx->format = FLB_KAFKA_FMT_RAW; + } } else { ctx->format = FLB_KAFKA_FMT_JSON; @@ -114,6 +117,14 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, ctx->message_key_field_len = 0; } + /* Config: Log_Key */ + if (ctx->raw_log_key) { + ctx->raw_log_key_len = strlen(ctx->raw_log_key); + } + else { + ctx->raw_log_key_len = 0; + } + /* Config: Timestamp_Key */ if (ctx->timestamp_key) { ctx->timestamp_key_len = strlen(ctx->timestamp_key); @@ -228,10 +239,6 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) flb_free(ctx->topic_key); } - if (ctx->message_key) { - flb_free(ctx->message_key); - } - if (ctx->message_key_field) { flb_free(ctx->message_key_field); } diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 42af378e161..14e036f8184 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -34,6 +34,7 @@ #ifdef FLB_HAVE_AVRO_ENCODER #define FLB_KAFKA_FMT_AVRO 3 #endif +#define FLB_KAFKA_FMT_RAW 4 #define FLB_KAFKA_TS_KEY "@timestamp" #define FLB_KAFKA_QUEUE_FULL_RETRIES "10" @@ -80,6 +81,9 @@ struct flb_out_kafka { int message_key_field_len; char *message_key_field; + int raw_log_key_len; + char *raw_log_key; + /* Gelf Keys */ struct flb_gelf_fields gelf_fields; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index e902f7892ff..21942d439a7 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -101,6 +101,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_FLOWCOUNTER "out_flowcounter.c") FLB_RT_TEST(FLB_OUT_FORWARD "out_forward.c") FLB_RT_TEST(FLB_OUT_HTTP "out_http.c") + FLB_RT_TEST(FLB_OUT_KAFKA "out_kafka.c") FLB_RT_TEST(FLB_OUT_LIB "out_lib.c") FLB_RT_TEST(FLB_OUT_LOKI "out_loki.c") FLB_RT_TEST(FLB_OUT_NULL "out_null.c") diff --git a/tests/runtime/out_kafka.c b/tests/runtime/out_kafka.c new file mode 100644 index 00000000000..4c5c852bc44 --- /dev/null +++ b/tests/runtime/out_kafka.c @@ -0,0 +1,49 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include "flb_tests_runtime.h" + +/* Test data */ +#include "data/td/json_td.h" + + +void flb_test_raw_format() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Kafka output */ + out_ffd = flb_output(ctx, (char *) "kafka", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + /* Switch to raw mode and select a key */ + flb_output_set(ctx, out_ffd, "format", "raw", NULL); + flb_output_set(ctx, out_ffd, "raw_log_key", "key_0", NULL); + flb_output_set(ctx, out_ffd, "topics", "test", NULL); + flb_output_set(ctx, out_ffd, "brokers", "127.0.0.1:111", NULL); + flb_output_set(ctx, out_ffd, "queue_full_retries", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +TEST_LIST = { + { "raw_format", flb_test_raw_format }, + { NULL, NULL }, +};