diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index d9e929c6669..6525e96d867 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -25,6 +25,7 @@ #define FLB_AWS_COMPRESS_GZIP 1 #define FLB_AWS_COMPRESS_ARROW 2 #define FLB_AWS_COMPRESS_PARQUET 3 +#define FLB_AWS_COMPRESS_ZSTD 4 /* * Get compression type from compression keyword. The return value is used to identify diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index d9d25f187b1..3eedb9334c0 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -84,12 +84,31 @@ static void remove_from_queue(struct upload_queue *entry); static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context); -static struct flb_aws_header content_encoding_header = { - .key = "Content-Encoding", - .key_len = 16, - .val = "gzip", - .val_len = 4, -}; +static struct flb_aws_header *get_content_encoding_header(int compression_type) +{ + static struct flb_aws_header gzip_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "gzip", + .val_len = 4, + }; + + static struct flb_aws_header zstd_header = { + .key = "Content-Encoding", + .key_len = 16, + .val = "zstd", + .val_len = 4, + }; + + switch (compression_type) { + case FLB_AWS_COMPRESS_GZIP: + return &gzip_header; + case FLB_AWS_COMPRESS_ZSTD: + return &zstd_header; + default: + return NULL; + } +} static struct flb_aws_header content_type_header = { .key = "Content-Type", @@ -158,11 +177,12 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, int n = 0; int headers_len = 0; struct flb_aws_header *s3_headers = NULL; + struct flb_aws_header *encoding_header = NULL; if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -192,8 +212,15 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { - s3_headers[n] = content_encoding_header; + if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) { + encoding_header = get_content_encoding_header(ctx->compression); + + if (encoding_header == NULL) { + flb_errno(); + flb_free(s3_headers); + return -1; + } + s3_headers[n] = *encoding_header; n++; } if (ctx->canned_acl != NULL) { @@ -1175,7 +1202,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { - if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if ((ctx->use_put_object == FLB_FALSE && (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD))) { flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " "the chunk was too small, using PutObject to upload", preCompress_size, body_size); } @@ -3998,10 +4025,11 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. " + "Compression type for S3 objects. 'gzip', 'arrow', 'parquet' and 'zstd' are the supported values. " "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." + "If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index 253020e392a..040c3e1e28e 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -23,6 +23,7 @@ #include #include +#include #include @@ -48,6 +49,11 @@ static const struct compression_option compression_options[] = { "gzip", &flb_gzip_compress }, + { + FLB_AWS_COMPRESS_ZSTD, + "zstd", + (int *)&flb_zstd_compress + }, #ifdef FLB_HAVE_ARROW { FLB_AWS_COMPRESS_ARROW, diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c index 180a0985b92..2a94754374d 100644 --- a/tests/internal/aws_compress.c +++ b/tests/internal/aws_compress.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "flb_tests_internal.h" @@ -35,6 +36,9 @@ static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases); static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( struct flb_aws_test_case *cases, size_t max_out_len); +static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( + struct flb_aws_test_case *cases, + size_t max_out_len); /** ------ Test Cases ------ **/ void test_compression_gzip() @@ -53,6 +57,22 @@ void test_compression_gzip() flb_aws_compress_test_cases(cases); } +void test_compression_zstd() +{ + struct flb_aws_test_case cases[] = + { + { + "zstd", + "hello hello hello hello hello hello", + "KLUv/SAjZQAAMGhlbGxvIAEAuUsR", + 0 + }, + { 0 } + }; + + flb_aws_compress_test_cases(cases); +} + void test_b64_truncated_gzip() { struct flb_aws_test_case cases[] = @@ -70,6 +90,22 @@ struct flb_aws_test_case cases[] = 41); } +void test_b64_truncated_zstd() +{ +struct flb_aws_test_case cases[] = + { + { + "zstd", + "hello hello hello hello hello hello", + "hello hello hello hello hello hello", + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__zstd_decode(cases,41); +} + void test_b64_truncated_gzip_truncation() { struct flb_aws_test_case cases[] = @@ -202,7 +238,9 @@ struct flb_aws_test_case cases[] = TEST_LIST = { { "test_compression_gzip", test_compression_gzip }, + { "test_compression_zstd", test_compression_zstd }, { "test_b64_truncated_gzip", test_b64_truncated_gzip }, + { "test_b64_truncated_zstd", test_b64_truncated_zstd }, { "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation }, { "test_b64_truncated_gzip_truncation_buffer_too_small", test_b64_truncated_gzip_truncation_buffer_too_small }, @@ -231,6 +269,14 @@ static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( cases, max_out_len, &flb_gzip_uncompress); } +static void flb_aws_compress_truncate_b64_test_cases__zstd_decode( + struct flb_aws_test_case *cases, + size_t max_out_len) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, + cases, max_out_len, (int *)&flb_zstd_uncompress); +} + /* General test case loop flb_aws_compress */ static void flb_aws_compress_general_test_cases(int test_type, struct flb_aws_test_case *cases,