diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index 6525e96d867..ada6c83191e 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -26,6 +26,7 @@ #define FLB_AWS_COMPRESS_ARROW 2 #define FLB_AWS_COMPRESS_PARQUET 3 #define FLB_AWS_COMPRESS_ZSTD 4 +#define FLB_AWS_COMPRESS_SNAPPY 5 /* * Get compression type from compression keyword. The return value is used to identify diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index ce6e43aec4a..431239de747 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -496,9 +496,8 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, "Compression type for Firehose records. Each log record is individually compressed " - "and sent to Firehose. 'gzip' and 'arrow' are the supported values. " - "'arrow' is only an available if Apache Arrow was enabled at compile time. " - "Defaults to no compression." + "and sent to Firehose. Supported values: 'gzip', 'zstd', 'snappy'. " + "'arrow' is also available if Apache Arrow was enabled at compile time. " }, { diff --git a/plugins/out_kinesis_streams/kinesis.c b/plugins/out_kinesis_streams/kinesis.c index ec954b3358d..6a58b649191 100644 --- a/plugins/out_kinesis_streams/kinesis.c +++ b/plugins/out_kinesis_streams/kinesis.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -146,6 +147,19 @@ static int cb_kinesis_init(struct flb_output_instance *ins, ctx->log_key = tmp; } + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + goto error; + } + ctx->compression = ret; + } + else { + ctx->compression = FLB_AWS_COMPRESS_NONE; + } + tmp = flb_output_get_property("region", ins); if (tmp) { ctx->region = tmp; @@ -522,6 +536,14 @@ static struct flb_config_map config_map[] = { "This reduces the number of requests and can improve throughput." }, + { + FLB_CONFIG_MAP_STR, "compression", NULL, + 0, FLB_FALSE, 0, + "Compression type for Kinesis records. Each log record is individually compressed " + "and sent to Kinesis Data Streams. Supported values: 'gzip', 'zstd', 'snappy'. " + "Defaults to no compression." + }, + /* EOF */ {0} }; diff --git a/plugins/out_kinesis_streams/kinesis.h b/plugins/out_kinesis_streams/kinesis.h index d5842ed96f9..84c76dde44e 100644 --- a/plugins/out_kinesis_streams/kinesis.h +++ b/plugins/out_kinesis_streams/kinesis.h @@ -98,6 +98,7 @@ struct flb_kinesis { const char *external_id; int retry_requests; int simple_aggregation; + int compression; char *sts_endpoint; int custom_endpoint; uint16_t port; diff --git a/plugins/out_kinesis_streams/kinesis_api.c b/plugins/out_kinesis_streams/kinesis_api.c index e021202aafa..50e2cfcf237 100644 --- a/plugins/out_kinesis_streams/kinesis_api.c +++ b/plugins/out_kinesis_streams/kinesis_api.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -383,13 +384,40 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf, } tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, - (unsigned char *) tmp_buf_ptr, written); - if (ret != 0) { - flb_errno(); - return -1; + + /* Handle compression if enabled */ + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + void *compressed_buf = NULL; + size_t compressed_size = 0; + + ret = flb_aws_compression_b64_truncate_compress(ctx->compression, + MAX_B64_EVENT_SIZE, + tmp_buf_ptr, + written, + &compressed_buf, + &compressed_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Unable to compress record, discarding, %s", + ctx->stream_name); + return 2; + } + + /* Replace event buffer with compressed buffer */ + flb_free(buf->event_buf); + buf->event_buf = compressed_buf; + buf->event_buf_size = compressed_size; + written = compressed_size; + } + else { + /* Base64 encode the record */ + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) tmp_buf_ptr, written); + if (ret != 0) { + flb_errno(); + return -1; + } + written = b64_len; } - written = b64_len; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { @@ -431,25 +459,56 @@ static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) { return 0; } - /* Base64 encode the aggregated record */ - size_t size = (agg_size * 1.5) + 4; - if (buf->event_buf == NULL || buf->event_buf_size < size) { - flb_free(buf->event_buf); - buf->event_buf = flb_malloc(size); - buf->event_buf_size = size; - if (buf->event_buf == NULL) { - flb_errno(); - return -1; + /* Handle compression if enabled */ + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + void *compressed_buf = NULL; + size_t compressed_size = 0; + + ret = flb_aws_compression_b64_truncate_compress(ctx->compression, + MAX_B64_EVENT_SIZE, + buf->agg_buf.agg_buf, + agg_size, + &compressed_buf, + &compressed_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s", + ctx->stream_name); + flb_aws_aggregation_reset(&buf->agg_buf); + return 0; + } + + /* Ensure event_buf is large enough */ + if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) { + flb_free(buf->event_buf); + buf->event_buf = compressed_buf; + buf->event_buf_size = compressed_size; + } else { + memcpy(buf->event_buf, compressed_buf, compressed_size); + flb_free(compressed_buf); } + agg_size = compressed_size; } + else { + /* Base64 encode the aggregated record */ + size_t size = (agg_size * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } - ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, - (unsigned char *) buf->agg_buf.agg_buf, agg_size); - if (ret != 0) { - flb_errno(); - return -1; + ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) buf->agg_buf.agg_buf, agg_size); + if (ret != 0) { + flb_errno(); + return -1; + } + agg_size = b64_len; } - agg_size = b64_len; /* Copy to tmp_buf */ if (buf->tmp_buf_size < agg_size) { diff --git a/plugins/out_kinesis_streams/kinesis_api.h b/plugins/out_kinesis_streams/kinesis_api.h index d08efaba743..d329b4f37fc 100644 --- a/plugins/out_kinesis_streams/kinesis_api.h +++ b/plugins/out_kinesis_streams/kinesis_api.h @@ -23,6 +23,7 @@ #define PUT_RECORDS_PAYLOAD_SIZE 5242880 #define MAX_EVENTS_PER_PUT 500 #define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */ +#define MAX_B64_EVENT_SIZE 1398076 /* ceil(1048556 / 3) * 4 */ /* number of characters needed to 'start' a PutRecords payload */ #define PUT_RECORDS_HEADER_LEN 30 diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 97c1b1d0fdb..05d65ea428f 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -100,11 +100,20 @@ static struct flb_aws_header *get_content_encoding_header(int compression_type) .val_len = 4, }; + static struct flb_aws_header snappy_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "snappy", + .val_len = 6, + }; + switch (compression_type) { case FLB_AWS_COMPRESS_GZIP: return &gzip_header; case FLB_AWS_COMPRESS_ZSTD: return &zstd_header; + case FLB_AWS_COMPRESS_SNAPPY: + return &snappy_header; default: return NULL; } @@ -182,7 +191,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || + ctx->compression == FLB_AWS_COMPRESS_ZSTD || + ctx->compression == FLB_AWS_COMPRESS_SNAPPY) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -212,7 +223,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || + ctx->compression == FLB_AWS_COMPRESS_ZSTD || + ctx->compression == FLB_AWS_COMPRESS_SNAPPY) { encoding_header = get_content_encoding_header(ctx->compression); if (encoding_header == NULL) { @@ -4042,11 +4055,12 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip', 'arrow', 'parquet' and 'zstd' are the supported values. " - "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " + "Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy'. " + "'arrow' and 'parquet' are also available if Apache Arrow was enabled at compile time. " "Defaults to no compression. " - "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." - "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'." + "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'. " + "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'. " + "If 'snappy' is selected, the Content-Encoding HTTP Header will be set to 'snappy'." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index 45fc1510255..8188c6b7038 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -31,6 +32,23 @@ #include "compression/arrow/compress.h" #endif +/* Wrapper function to adapt flb_snappy_compress to AWS compression interface */ +static int flb_snappy_compress_wrapper(void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + int ret; + + ret = flb_snappy_compress((char *) in_data, in_len, + (char **) out_data, out_len); + + /* Normalize all error codes to -1 per AWS compression interface contract */ + if (ret < 0) { + return -1; + } + + return ret; +} + struct compression_option { int compression_type; char *compression_keyword; @@ -54,6 +72,11 @@ static const struct compression_option compression_options[] = { "zstd", &flb_zstd_compress }, + { + FLB_AWS_COMPRESS_SNAPPY, + "snappy", + &flb_snappy_compress_wrapper + }, #ifdef FLB_HAVE_ARROW { FLB_AWS_COMPRESS_ARROW, diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index 7bb9d24b052..bf55f425c50 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "flb_tests_internal.h" @@ -39,6 +40,17 @@ static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( struct flb_aws_test_case *cases, size_t max_out_len); +static void flb_aws_compress_truncate_b64_test_cases__snappy_decode( + struct flb_aws_test_case *cases, + size_t max_out_len); + +/* Wrapper function to adapt flb_snappy_uncompress to test interface */ +static int flb_snappy_uncompress_wrapper(void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + return flb_snappy_uncompress((char *) in_data, in_len, + (char **) out_data, out_len); +} /** ------ Test Cases ------ **/ void test_compression_gzip() @@ -73,6 +85,54 @@ void test_compression_zstd() flb_aws_compress_test_cases(cases); } +void test_compression_snappy() +{ + struct flb_aws_test_case cases[] = + { + { + "snappy", + "The quick brown fox jumps over the lazy dog", + "K6hUaGUgcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgdGhlIGxhenkgZG9n", + 0 + }, + { 0 } + }; + + flb_aws_compress_test_cases(cases); +} + +void test_compression_snappy_return_value_normalization() +{ + /* This test verifies that the snappy wrapper correctly normalizes return values + * to conform to the AWS compression interface contract: -1 on error, 0 on success. + * + * The test uses the actual flb_aws_compression_compress function which internally + * uses the wrapper. We verify that successful compression returns exactly 0, + * demonstrating that the wrapper properly normalizes the return value. + */ + int ret; + void *out_data = NULL; + size_t out_len = 0; + int compression_type; + char test_data[] = "test data for compression"; + + compression_type = flb_aws_compression_get_type("snappy"); + TEST_CHECK(compression_type != -1); + + /* Test successful compression - should return exactly 0 (not any other value) */ + ret = flb_aws_compression_compress(compression_type, test_data, + strlen(test_data), &out_data, &out_len); + TEST_CHECK(ret == 0); + TEST_MSG("Expected return value 0 on success, got: %d", ret); + TEST_MSG("This verifies the wrapper returns 0 (not passthrough of underlying function)"); + + if (ret == 0 && out_data != NULL) { + TEST_CHECK(out_len > 0); + TEST_MSG("Compressed data length: %zu", out_len); + flb_free(out_data); + } +} + void test_b64_truncated_gzip() { struct flb_aws_test_case cases[] = @@ -106,6 +166,22 @@ struct flb_aws_test_case cases[] = flb_aws_compress_truncate_b64_test_cases__zstd_decode(cases,41); } +void test_b64_truncated_snappy() +{ +struct flb_aws_test_case cases[] = + { + { + "snappy", + "The quick brown fox jumps over the lazy dog", + "The quick brown fox jumps over the lazy dog", + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__snappy_decode(cases, 60); +} + void test_b64_truncated_gzip_truncation() { struct flb_aws_test_case cases[] = @@ -236,11 +312,37 @@ struct flb_aws_test_case cases[] = 300); } +void test_b64_truncated_gzip_boundary() +{ + /* Test the boundary condition where compressed output exactly matches max_out_len. + * This test verifies the fix for the off-by-one error where records at the exact + * limit were incorrectly truncated. Input is crafted to compress to exactly 40 bytes + * of base64 output (without null terminator). */ + struct flb_aws_test_case cases[] = + { + { + "gzip", + "test", /* Small input that compresses to ~40 bytes base64 */ + "test", + 0 /* Expected ret - should succeed without truncation */ + }, + { 0 } + }; + + /* Set max_out_len to exactly match the expected compressed size. + * With the fix, this should NOT trigger truncation. */ + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, 40); +} + TEST_LIST = { { "test_compression_gzip", test_compression_gzip }, { "test_compression_zstd", test_compression_zstd }, + { "test_compression_snappy", test_compression_snappy }, + { "test_compression_snappy_return_value_normalization", + test_compression_snappy_return_value_normalization }, { "test_b64_truncated_gzip", test_b64_truncated_gzip }, { "test_b64_truncated_zstd", test_b64_truncated_zstd }, + { "test_b64_truncated_snappy", test_b64_truncated_snappy }, { "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation }, { "test_b64_truncated_gzip_truncation_buffer_too_small", test_b64_truncated_gzip_truncation_buffer_too_small }, @@ -248,6 +350,8 @@ TEST_LIST = { test_b64_truncated_gzip_truncation_edge }, { "test_b64_truncated_gzip_truncation_multi_rounds", test_b64_truncated_gzip_truncation_multi_rounds }, + { "test_b64_truncated_gzip_boundary", + test_b64_truncated_gzip_boundary }, { 0 } }; @@ -277,6 +381,14 @@ static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( cases, max_out_len, &flb_zstd_uncompress); } +static void flb_aws_compress_truncate_b64_test_cases__snappy_decode( + struct flb_aws_test_case *cases, + size_t max_out_len) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, + cases, max_out_len, &flb_snappy_uncompress_wrapper); +} + /* General test case loop flb_aws_compress */ static void flb_aws_compress_general_test_cases(int test_type, struct flb_aws_test_case *cases, diff --git a/tests/runtime/out_firehose.c b/tests/runtime/out_firehose.c index e69637070c7..3a2a9dda9f7 100644 --- a/tests/runtime/out_firehose.c +++ b/tests/runtime/out_firehose.c @@ -375,6 +375,121 @@ void flb_test_firehose_aggregation_with_compression(void) flb_destroy(ctx); } +void flb_test_firehose_compression_zstd(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"zstd_test1\"}]"; + const char *record2 = "[1, {\"message\":\"zstd_test2\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "compression", "zstd", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with ZSTD compression */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_compression_snappy(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"snappy_test1\"}]"; + const char *record2 = "[1, {\"message\":\"snappy_test2\"}]"; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "compression", "snappy", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with Snappy compression */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_firehose_compression_snappy_with_aggregation(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int i; + char record[100]; + + setenv("FLB_FIREHOSE_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_firehose", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "delivery_stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "compression", "snappy", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push many records with Snappy compression and aggregation */ + for (i = 0; i < 20; i++) { + ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"snappy_agg\"}]", i); + TEST_CHECK(ret < sizeof(record)); + flb_lib_push(ctx, in_ffd, record, strlen(record)); + } + + sleep(3); + flb_stop(ctx); + flb_destroy(ctx); +} + void flb_test_firehose_aggregation_combined_params(void) { int ret; @@ -536,6 +651,9 @@ TEST_LIST = { {"aggregation_with_log_key", flb_test_firehose_aggregation_with_log_key }, {"aggregation_many_records", flb_test_firehose_aggregation_many_records }, {"aggregation_with_compression", flb_test_firehose_aggregation_with_compression }, + {"compression_zstd", flb_test_firehose_compression_zstd }, + {"compression_snappy", flb_test_firehose_compression_snappy }, + {"compression_snappy_with_aggregation", flb_test_firehose_compression_snappy_with_aggregation }, {"aggregation_combined_params", flb_test_firehose_aggregation_combined_params }, {"aggregation_empty_records", flb_test_firehose_aggregation_empty_records }, {"aggregation_error_handling", flb_test_firehose_aggregation_error_handling }, diff --git a/tests/runtime/out_kinesis.c b/tests/runtime/out_kinesis.c index 1b6f74e8b24..4642eea71fc 100644 --- a/tests/runtime/out_kinesis.c +++ b/tests/runtime/out_kinesis.c @@ -443,6 +443,158 @@ void flb_test_kinesis_aggregation_many_records(void) flb_destroy(ctx); } +void flb_test_kinesis_compression_gzip(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"gzip_test1\"}]"; + const char *record2 = "[1, {\"message\":\"gzip_test2\"}]"; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "compression", "gzip", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with GZIP compression */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_compression_zstd(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"zstd_test1\"}]"; + const char *record2 = "[1, {\"message\":\"zstd_test2\"}]"; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "compression", "zstd", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with ZSTD compression */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_compression_snappy(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *record1 = "[1, {\"message\":\"snappy_test1\"}]"; + const char *record2 = "[1, {\"message\":\"snappy_test2\"}]"; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "compression", "snappy", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push records with Snappy compression */ + flb_lib_push(ctx, in_ffd, (char *) record1, strlen(record1)); + flb_lib_push(ctx, in_ffd, (char *) record2, strlen(record2)); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_kinesis_compression_snappy_with_aggregation(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int i; + char record[100]; + + setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd, "stream", "fluent", NULL); + flb_output_set(ctx, out_ffd, "simple_aggregation", "On", NULL); + flb_output_set(ctx, out_ffd, "compression", "snappy", NULL); + flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push many records with Snappy compression and aggregation */ + for (i = 0; i < 20; i++) { + ret = snprintf(record, sizeof(record), "[1, {\"id\":%d,\"msg\":\"snappy_agg\"}]", i); + TEST_CHECK(ret < sizeof(record)); + flb_lib_push(ctx, in_ffd, record, strlen(record)); + } + + sleep(3); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"success", flb_test_firehose_success }, @@ -457,5 +609,9 @@ TEST_LIST = { {"aggregation_with_time_key", flb_test_kinesis_aggregation_with_time_key }, {"aggregation_with_log_key", flb_test_kinesis_aggregation_with_log_key }, {"aggregation_many_records", flb_test_kinesis_aggregation_many_records }, + {"compression_gzip", flb_test_kinesis_compression_gzip }, + {"compression_zstd", flb_test_kinesis_compression_zstd }, + {"compression_snappy", flb_test_kinesis_compression_snappy }, + {"compression_snappy_with_aggregation", flb_test_kinesis_compression_snappy_with_aggregation }, {NULL, NULL} }; diff --git a/tests/runtime/out_s3.c b/tests/runtime/out_s3.c index 5968ff12a50..01ad320b5f1 100644 --- a/tests/runtime/out_s3.c +++ b/tests/runtime/out_s3.c @@ -228,6 +228,216 @@ void flb_test_s3_complete_upload_error(void) unsetenv("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR"); } +void flb_test_s3_compression_gzip(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "gzip", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_s3_compression_gzip_putobject(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "gzip", NULL); + flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL); + flb_output_set(ctx, out_ffd,"total_file_size", "5M", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_s3_compression_zstd(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "zstd", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_s3_compression_zstd_putobject(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "zstd", NULL); + flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL); + flb_output_set(ctx, out_ffd,"total_file_size", "5M", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_s3_compression_snappy(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "snappy", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_s3_compression_snappy_putobject(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* mocks calls- signals that we are in test mode */ + setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1); + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx,in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "s3", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd,"match", "*", NULL); + flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL); + flb_output_set(ctx, out_ffd,"bucket", "fluent", NULL); + flb_output_set(ctx, out_ffd,"compression", "snappy", NULL); + flb_output_set(ctx, out_ffd,"use_put_object", "true", NULL); + flb_output_set(ctx, out_ffd,"total_file_size", "5M", NULL); + flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD , (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { @@ -237,5 +447,11 @@ TEST_LIST = { {"create_upload_error", flb_test_s3_create_upload_error }, {"upload_part_error", flb_test_s3_upload_part_error }, {"complete_upload_error", flb_test_s3_complete_upload_error }, + {"compression_gzip", flb_test_s3_compression_gzip }, + {"compression_gzip_putobject", flb_test_s3_compression_gzip_putobject }, + {"compression_zstd", flb_test_s3_compression_zstd }, + {"compression_zstd_putobject", flb_test_s3_compression_zstd_putobject }, + {"compression_snappy", flb_test_s3_compression_snappy }, + {"compression_snappy_putobject", flb_test_s3_compression_snappy_putobject }, {NULL, NULL} };