Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/aws/flb_aws_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#define FLB_AWS_COMPRESS_ARROW 2
#define FLB_AWS_COMPRESS_PARQUET 3
#define FLB_AWS_COMPRESS_ZSTD 4
#define FLB_AWS_COMPRESS_SNAPPY 5

/*
* Get compression type from compression keyword. The return value is used to identify
Expand Down
5 changes: 2 additions & 3 deletions plugins/out_kinesis_firehose/firehose.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,8 @@ static struct flb_config_map config_map[] = {
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for Firehose records. Each log record is individually compressed "
"and sent to Firehose. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Defaults to no compression."
"and sent to Firehose. Supported values: 'gzip', 'zstd', 'snappy'. "
"'arrow' is also available if Apache Arrow was enabled at compile time. "
},

{
Expand Down
22 changes: 22 additions & 0 deletions plugins/out_kinesis_streams/kinesis.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/aws/flb_aws_compress.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand Down Expand Up @@ -146,6 +147,19 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
ctx->log_key = tmp;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
goto error;
}
ctx->compression = ret;
}
else {
ctx->compression = FLB_AWS_COMPRESS_NONE;
}

tmp = flb_output_get_property("region", ins);
if (tmp) {
ctx->region = tmp;
Expand Down Expand Up @@ -522,6 +536,14 @@ static struct flb_config_map config_map[] = {
"This reduces the number of requests and can improve throughput."
},

{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for Kinesis records. Each log record is individually compressed "
"and sent to Kinesis Data Streams. Supported values: 'gzip', 'zstd', 'snappy'. "
"Defaults to no compression."
},

/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_streams/kinesis.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct flb_kinesis {
const char *external_id;
int retry_requests;
int simple_aggregation;
int compression;
char *sts_endpoint;
int custom_endpoint;
uint16_t port;
Expand Down
101 changes: 80 additions & 21 deletions plugins/out_kinesis_streams/kinesis_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/aws/flb_aws_aggregation.h>
#include <fluent-bit/aws/flb_aws_compress.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand Down Expand Up @@ -383,13 +384,40 @@ static int process_event(struct flb_kinesis *ctx, struct flush *buf,
}

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;

/* Handle compression if enabled */
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
void *compressed_buf = NULL;
size_t compressed_size = 0;

ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
MAX_B64_EVENT_SIZE,
tmp_buf_ptr,
written,
&compressed_buf,
&compressed_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Unable to compress record, discarding, %s",
ctx->stream_name);
return 2;
}

/* Replace event buffer with compressed buffer */
flb_free(buf->event_buf);
buf->event_buf = compressed_buf;
buf->event_buf_size = compressed_size;
written = compressed_size;
}
else {
/* Base64 encode the record */
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;
}
written = b64_len;
}
written = b64_len;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) {
Expand Down Expand Up @@ -431,25 +459,56 @@ static int send_aggregated_record(struct flb_kinesis *ctx, struct flush *buf) {
return 0;
}

/* Base64 encode the aggregated record */
size_t size = (agg_size * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
/* Handle compression if enabled */
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
void *compressed_buf = NULL;
size_t compressed_size = 0;

ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
MAX_B64_EVENT_SIZE,
buf->agg_buf.agg_buf,
agg_size,
&compressed_buf,
&compressed_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Unable to compress aggregated record, discarding, %s",
ctx->stream_name);
flb_aws_aggregation_reset(&buf->agg_buf);
return 0;
}

/* Ensure event_buf is large enough */
if (buf->event_buf == NULL || buf->event_buf_size < compressed_size) {
flb_free(buf->event_buf);
buf->event_buf = compressed_buf;
buf->event_buf_size = compressed_size;
} else {
memcpy(buf->event_buf, compressed_buf, compressed_size);
flb_free(compressed_buf);
}
agg_size = compressed_size;
}
else {
/* Base64 encode the aggregated record */
size_t size = (agg_size * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
}
}

ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
if (ret != 0) {
flb_errno();
return -1;
ret = flb_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) buf->agg_buf.agg_buf, agg_size);
if (ret != 0) {
flb_errno();
return -1;
}
agg_size = b64_len;
}
agg_size = b64_len;

/* Copy to tmp_buf */
if (buf->tmp_buf_size < agg_size) {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_streams/kinesis_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define PUT_RECORDS_PAYLOAD_SIZE 5242880
#define MAX_EVENTS_PER_PUT 500
#define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */
#define MAX_B64_EVENT_SIZE 1398076 /* ceil(1048556 / 3) * 4 */

/* number of characters needed to 'start' a PutRecords payload */
#define PUT_RECORDS_HEADER_LEN 30
Expand Down
26 changes: 20 additions & 6 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,20 @@ static struct flb_aws_header *get_content_encoding_header(int compression_type)
.val_len = 4,
};

static struct flb_aws_header snappy_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "snappy",
.val_len = 6,
};

switch (compression_type) {
case FLB_AWS_COMPRESS_GZIP:
return &gzip_header;
case FLB_AWS_COMPRESS_ZSTD:
return &zstd_header;
case FLB_AWS_COMPRESS_SNAPPY:
return &snappy_header;
default:
return NULL;
}
Expand Down Expand Up @@ -182,7 +191,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
ctx->compression == FLB_AWS_COMPRESS_ZSTD ||
ctx->compression == FLB_AWS_COMPRESS_SNAPPY) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
Expand Down Expand Up @@ -212,7 +223,9 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP || ctx->compression == FLB_AWS_COMPRESS_ZSTD) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
ctx->compression == FLB_AWS_COMPRESS_ZSTD ||
ctx->compression == FLB_AWS_COMPRESS_SNAPPY) {
encoding_header = get_content_encoding_header(ctx->compression);

if (encoding_header == NULL) {
Expand Down Expand Up @@ -4042,11 +4055,12 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip', 'arrow', 'parquet' and 'zstd' are the supported values. "
"'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. "
"Compression type for S3 objects. Supported values: 'gzip', 'zstd', 'snappy'. "
"'arrow' and 'parquet' are also available if Apache Arrow was enabled at compile time. "
"Defaults to no compression. "
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
"If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'."
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'. "
"If 'zstd' is selected, the Content-Encoding HTTP Header will be set to 'zstd'. "
"If 'snappy' is selected, the Content-Encoding HTTP Header will be set to 'snappy'."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
Expand Down
23 changes: 23 additions & 0 deletions src/aws/flb_aws_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,31 @@
#include <fluent-bit/aws/flb_aws_compress.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_snappy.h>

#include <stdint.h>

#ifdef FLB_HAVE_ARROW
#include "compression/arrow/compress.h"
#endif

/* Wrapper function to adapt flb_snappy_compress to AWS compression interface */
static int flb_snappy_compress_wrapper(void *in_data, size_t in_len,
void **out_data, size_t *out_len)
{
int ret;

ret = flb_snappy_compress((char *) in_data, in_len,
(char **) out_data, out_len);

/* Normalize all error codes to -1 per AWS compression interface contract */
if (ret < 0) {
return -1;
}

return ret;
}

struct compression_option {
int compression_type;
char *compression_keyword;
Expand All @@ -54,6 +72,11 @@ static const struct compression_option compression_options[] = {
"zstd",
&flb_zstd_compress
},
{
FLB_AWS_COMPRESS_SNAPPY,
"snappy",
&flb_snappy_compress_wrapper
},
#ifdef FLB_HAVE_ARROW
{
FLB_AWS_COMPRESS_ARROW,
Expand Down
Loading
Loading