Skip to content

Commit

Permalink
out_datadog: fix/add error handling for all flb_sds calls (fluent#5930)
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: root <root@sumit-acs.novalocal>
  • Loading branch information
PettitWesley authored and root committed Feb 8, 2023
1 parent 31b6ca6 commit b1bc2e5
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 45 deletions.
64 changes: 55 additions & 9 deletions plugins/out_datadog/datadog.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ static int datadog_format(struct flb_config *config,
{
int i;
int ind;
int byte_cnt;
int byte_cnt = 64;
int remap_cnt;
int ret;
/* for msgpack global structs */
int array_size = 0;
size_t array_size = 0;
size_t off = 0;
msgpack_unpacked result;
msgpack_sbuffer mp_sbuf;
Expand All @@ -111,13 +112,23 @@ static int datadog_format(struct flb_config *config,
msgpack_object k;
msgpack_object v;
struct flb_out_datadog *ctx = plugin_context;
struct flb_event_chunk *event_chunk;

/* output buffer */
flb_sds_t out_buf;
flb_sds_t remapped_tags = NULL;

/* Count number of records */
array_size = flb_mp_count(data, bytes);
flb_sds_t tmp = NULL;

/* in normal flush callback we have the event_chunk set as flush context
* so we don't need to calculate the event len.
* But in test mode the formatter won't get the event_chunk as flush_ctx
*/
if (flush_ctx != NULL) {
event_chunk = flush_ctx;
array_size = event_chunk->total_events;
} else {
array_size = flb_mp_count(data, bytes);
}

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
Expand Down Expand Up @@ -162,6 +173,22 @@ static int datadog_format(struct flb_config *config,

if (!remapped_tags) {
remapped_tags = flb_sds_create_size(byte_cnt);
if (!remapped_tags) {
flb_errno();
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
} else if (flb_sds_len(remapped_tags) < byte_cnt) {
tmp = flb_sds_increase(remapped_tags, flb_sds_len(remapped_tags) - byte_cnt);
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
}

/*
Expand Down Expand Up @@ -228,8 +255,11 @@ static int datadog_format(struct flb_config *config,
* (so they won't be packed as attr)
*/
if (ctx->remap && (ind = dd_attr_need_remapping(k, v)) >=0 ) {
remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
remapped_tags);
ret = remapping[ind].remap_to_tag(remapping[ind].remap_tag_name, v,
&remapped_tags);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to remap tag: %s, skipping", remapping[ind].remap_tag_name);
}
continue;
}

Expand All @@ -251,9 +281,25 @@ static int datadog_format(struct flb_config *config,
/* here we concatenate ctx->dd_tags and remapped_tags, depending on their presence */
if (remap_cnt) {
if (ctx->dd_tags != NULL) {
flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
strlen(FLB_DATADOG_TAG_SEPERATOR));
tmp = flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR,
strlen(FLB_DATADOG_TAG_SEPERATOR));
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
flb_sds_cat(remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags));
if (!tmp) {
flb_errno();
flb_sds_destroy(remapped_tags);
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
return -1;
}
remapped_tags = tmp;
}
dd_msgpack_pack_key_value_str(&mp_pck,
FLB_DATADOG_DD_TAGS_KEY,
Expand Down
21 changes: 17 additions & 4 deletions plugins/out_datadog/datadog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
struct flb_upstream *upstream;
const char *api_key;
const char *tmp;
flb_sds_t tmp_sds;

int ret;
char *protocol = NULL;
Expand Down Expand Up @@ -75,12 +76,18 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,
/* use TLS ? */
if (ins->use_tls == FLB_TRUE) {
io_flags = FLB_IO_TLS;
ctx->scheme = flb_sds_create("https://");
tmp_sds = flb_sds_create("https://");
}
else {
io_flags = FLB_IO_TCP;
ctx->scheme = flb_sds_create("http://");
tmp_sds = flb_sds_create("http://");
}
if (!tmp_sds) {
flb_errno();
flb_datadog_conf_destroy(ctx);
return NULL;
}
ctx->scheme = tmp_sds;
flb_plg_debug(ctx->ins, "scheme: %s", ctx->scheme);

/* configure URI */
Expand Down Expand Up @@ -126,11 +133,17 @@ struct flb_out_datadog *flb_datadog_conf_create(struct flb_output_instance *ins,

/* Get network configuration */
if (!ins->host.name) {
ctx->host = flb_sds_create(FLB_DATADOG_DEFAULT_HOST);
tmp_sds = flb_sds_create(FLB_DATADOG_DEFAULT_HOST);
}
else {
ctx->host = flb_sds_create(ins->host.name);
tmp_sds = flb_sds_create(ins->host.name);
}
if (!tmp_sds) {
flb_errno();
flb_datadog_conf_destroy(ctx);
return NULL;
}
ctx->host = tmp_sds;
flb_plg_debug(ctx->ins, "host: %s", ctx->host);

if (ins->host.port != 0) {
Expand Down
145 changes: 114 additions & 31 deletions plugins/out_datadog/datadog_remap.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,98 +28,172 @@ const char *ECS_ARN_PREFIX = "arn:aws:ecs:";
const char *ECS_CLUSTER_PREFIX = "cluster/";
const char *ECS_TASK_PREFIX = "task/";

static void dd_remap_append_kv_to_ddtags(const char *key,
const char *val, size_t val_len, flb_sds_t dd_tags)
static int dd_remap_append_kv_to_ddtags(const char *key,
const char *val, size_t val_len, flb_sds_t *dd_tags_buf)
{
if (flb_sds_len(dd_tags) != 0) {
flb_sds_cat(dd_tags, FLB_DATADOG_TAG_SEPERATOR, strlen(FLB_DATADOG_TAG_SEPERATOR));
flb_sds_t tmp;

if (flb_sds_len(*dd_tags_buf) != 0) {
tmp = flb_sds_cat(*dd_tags_buf, FLB_DATADOG_TAG_SEPERATOR, strlen(FLB_DATADOG_TAG_SEPERATOR));
if (!tmp) {
flb_errno();
return -1;
}
*dd_tags_buf = tmp;
}
flb_sds_cat(dd_tags, key, strlen(key));
flb_sds_cat(dd_tags, ":", 1);
flb_sds_cat(dd_tags, val, val_len);

tmp = flb_sds_cat(*dd_tags_buf, key, strlen(key));
if (!tmp) {
flb_errno();
return -1;
}
*dd_tags_buf = tmp;

tmp = flb_sds_cat(*dd_tags_buf, ":", 1);
if (!tmp) {
flb_errno();
return -1;
}
*dd_tags_buf = tmp;

tmp = flb_sds_cat(*dd_tags_buf, val, val_len);
if (!tmp) {
flb_errno();
return -1;
}
*dd_tags_buf = tmp;

return 0;
}

/* default remapping: just move the key/val pair under dd_tags */
static void dd_remap_move_to_tags(const char *tag_name,
msgpack_object attr_value, flb_sds_t dd_tags)
static int dd_remap_move_to_tags(const char *tag_name,
msgpack_object attr_value, flb_sds_t *dd_tags_buf)
{
dd_remap_append_kv_to_ddtags(tag_name, attr_value.via.str.ptr,
attr_value.via.str.size, dd_tags);
return dd_remap_append_kv_to_ddtags(tag_name, attr_value.via.str.ptr,
attr_value.via.str.size, dd_tags_buf);
}

/* remapping function for container_name */
static void dd_remap_container_name(const char *tag_name,
msgpack_object attr_value, flb_sds_t dd_tags)
static int dd_remap_container_name(const char *tag_name,
msgpack_object attr_value, flb_sds_t *dd_tags_buf)
{
/* remove the first / if present */
unsigned int adjust;
flb_sds_t buf;
flb_sds_t buf = NULL;
int ret;

adjust = attr_value.via.str.ptr[0] == '/' ? 1 : 0;
buf = flb_sds_create_len(attr_value.via.str.ptr + adjust,
attr_value.via.str.size - adjust);
dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags);
if (!buf) {
flb_errno();
return -1;
}
ret = dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags_buf);
flb_sds_destroy(buf);
if (ret < 0) {
return -1;
}

return 0;
}

/* remapping function for ecs_cluster */
static void dd_remap_ecs_cluster(const char *tag_name,
msgpack_object attr_value, flb_sds_t dd_tags)
static int dd_remap_ecs_cluster(const char *tag_name,
msgpack_object attr_value, flb_sds_t *dd_tags_buf)
{
flb_sds_t buf;
flb_sds_t buf = NULL;
char *cluster_name;
int ret;

buf = flb_sds_create_len(attr_value.via.str.ptr, attr_value.via.str.size);
if (!buf) {
flb_errno();
return -1;
}
cluster_name = strstr(buf, ECS_CLUSTER_PREFIX);

if (cluster_name != NULL) {
cluster_name += strlen(ECS_CLUSTER_PREFIX);
dd_remap_append_kv_to_ddtags(tag_name, cluster_name, strlen(cluster_name), dd_tags);
ret = dd_remap_append_kv_to_ddtags(tag_name, cluster_name, strlen(cluster_name), dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
}
else {
/*
* here the input is invalid: not in form of "XXXXXXcluster/"cluster-name
* we preverse the original value under tag "cluster_name".
*/
dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags);
ret = dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
}
flb_sds_destroy(buf);
return 0;
}

/* remapping function for ecs_task_definition */
static void dd_remap_ecs_task_definition(const char *tag_name,
msgpack_object attr_value, flb_sds_t dd_tags)
static int dd_remap_ecs_task_definition(const char *tag_name,
msgpack_object attr_value, flb_sds_t *dd_tags_buf)
{
flb_sds_t buf;
flb_sds_t buf = NULL;
char *split;
int ret;

buf = flb_sds_create_len(attr_value.via.str.ptr, attr_value.via.str.size);
if (!buf) {
flb_errno();
return -1;
}
split = strchr(buf, ':');

if (split != NULL) {
dd_remap_append_kv_to_ddtags("task_family", buf, split-buf, dd_tags);
dd_remap_append_kv_to_ddtags("task_version", split+1, strlen(split+1), dd_tags);
ret = dd_remap_append_kv_to_ddtags("task_family", buf, split-buf, dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
ret = dd_remap_append_kv_to_ddtags("task_version", split+1, strlen(split+1), dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
}
else {
/*
* here the input is invalid: not in form of task_name:task_version
* we preverse the original value under tag "ecs_task_definition".
*/
dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags);
ret = dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
}
flb_sds_destroy(buf);
return 0;
}

/* remapping function for ecs_task_arn */
static void dd_remap_ecs_task_arn(const char *tag_name,
msgpack_object attr_value, flb_sds_t dd_tags)
static int dd_remap_ecs_task_arn(const char *tag_name,
msgpack_object attr_value, flb_sds_t *dd_tags_buf)
{
flb_sds_t buf;
char *remain;
char *split;
char *task_arn;
int ret;

buf = flb_sds_create_len(attr_value.via.str.ptr, attr_value.via.str.size);
if (!buf) {
flb_errno();
return -1;
}

/*
* if the input is invalid, not in the form of "arn:aws:ecs:region:XXXX"
Expand All @@ -132,24 +206,33 @@ static void dd_remap_ecs_task_arn(const char *tag_name,
split = strchr(remain, ':');

if (split != NULL) {
dd_remap_append_kv_to_ddtags("region", remain, split-remain, dd_tags);
ret = dd_remap_append_kv_to_ddtags("region", remain, split-remain, dd_tags_buf);
if (ret < 0) {
flb_sds_destroy(buf);
return -1;
}
}
}

task_arn = strstr(buf, ECS_TASK_PREFIX);
if (task_arn != NULL) {
/* parse out the task_arn */
task_arn += strlen(ECS_TASK_PREFIX);
dd_remap_append_kv_to_ddtags(tag_name, task_arn, strlen(task_arn), dd_tags);
ret = dd_remap_append_kv_to_ddtags(tag_name, task_arn, strlen(task_arn), dd_tags_buf);
}
else {
/*
* if the input is invalid, not in the form of "XXXXXXXXtask/"task-arn
* then we preverse the original value under tag "task_arn".
*/
dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags);
ret = dd_remap_append_kv_to_ddtags(tag_name, buf, strlen(buf), dd_tags_buf);
}
flb_sds_destroy(buf);
if (ret < 0) {
return -1;
}

return 0;
}

/*
Expand Down
Loading

0 comments on commit b1bc2e5

Please sign in to comment.