Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_snappy.h>

#include <fluent-bit/flb_log_event.h>
#include <fluent-bit/flb_log_event_decoder.h>

#include <monkey/monkey.h>
#include <monkey/mk_core.h>

Expand All @@ -34,6 +37,7 @@

#define HTTP_CONTENT_JSON 0
#define HTTP_CONTENT_URLENCODED 1
#define HTTP_CONTENT_PLAIN 2

static inline char hex2nibble(char c)
{
Expand Down Expand Up @@ -215,6 +219,88 @@ static flb_sds_t tag_key(struct flb_http *ctx, msgpack_object *map)
return NULL;
}

static ssize_t parse_payload_plaintext(struct flb_http *ctx, flb_sds_t tag,
char *payload, size_t size)
{
int ret = FLB_EVENT_ENCODER_SUCCESS;
struct flb_time tm;
size_t offset = 0;
const char *start;
const char *nl;
flb_sds_t tag_from_record = NULL;
msgpack_object entry_key;
msgpack_unpacked record;
int result;
void *out_buf;
size_t out_size;
struct flb_time out_time = {0};
struct flb_log_event_encoder *log;


for (start = payload; start < payload+size && start != NULL; start = nl+1) {
nl = strchr(start, '\n');
if (nl == NULL) {
nl = payload + size;
}

if (nl-start <= 0) {
break;
}

if (*start == '\0') {
break;
}

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_reset_record(&ctx->log_encoder);
return -1;
}


ret = flb_log_event_encoder_set_timestamp(
&ctx->log_encoder, NULL);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_reset_record(&ctx->log_encoder);
return -1;
}

ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(start, (nl-start)));

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_reset_record(&ctx->log_encoder);
return -1;
}

ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_reset_record(&ctx->log_encoder);
return -1;
}

if (tag) {
ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}
else {
ret = flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder.output_buffer,
ctx->log_encoder.output_length);
}

flb_log_event_encoder_reset(&ctx->log_encoder);
}

return 0;
}

static int process_pack_record(struct flb_http *ctx, struct flb_time *tm,
flb_sds_t tag,
msgpack_object *record)
Expand Down Expand Up @@ -766,6 +852,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
type = HTTP_CONTENT_URLENCODED;
}

if (header->val.len == 10 &&
strncasecmp(header->val.data, "text/plain", 10) == 0) {
type = HTTP_CONTENT_PLAIN;
}

if (type == -1) {
send_response(conn, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand Down Expand Up @@ -817,6 +908,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
else if (type == HTTP_CONTENT_URLENCODED) {
ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len);
}
else if (type == HTTP_CONTENT_PLAIN) {
ret = parse_payload_plaintext(ctx, tag, request->data.data, request->data.len);
}

if (uncompressed_data != NULL) {
flb_free(uncompressed_data);
Expand Down Expand Up @@ -1201,6 +1295,10 @@ static int process_payload_ng(flb_sds_t tag,
type = HTTP_CONTENT_URLENCODED;
}

if (strcasecmp(request->content_type, "text/plain") == 0) {
type = HTTP_CONTENT_PLAIN;
}

if (type == -1) {
send_response_ng(response, 400, "error: invalid 'Content-Type'\n");
return -1;
Expand All @@ -1222,7 +1320,13 @@ static int process_payload_ng(flb_sds_t tag,
return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload));
}
}

else if (type == HTTP_CONTENT_PLAIN) {
ctx = (struct flb_http *) request->stream->user_data;
payload = (char *) request->body;
if (payload) {
return parse_payload_plaintext(ctx, tag, payload, cfl_sds_len(payload));
}
}
return 0;
}

Expand Down
Loading
Loading