From b7639e88f62d65259c515f55b9b26f91f65f6c03 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Mon, 24 Mar 2025 17:12:15 -0300 Subject: [PATCH 1/2] in_http: add support text/plain content as newline delimited log records. Signed-off-by: Phillip Adair Stewart Whelan --- plugins/in_http/http_prot.c | 106 +++++++++++++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index af3bc1932ca..ef80583124c 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -26,6 +26,9 @@ #include #include +#include +#include + #include #include @@ -34,6 +37,7 @@ #define HTTP_CONTENT_JSON 0 #define HTTP_CONTENT_URLENCODED 1 +#define HTTP_CONTENT_PLAIN 2 static inline char hex2nibble(char c) { @@ -215,6 +219,88 @@ static flb_sds_t tag_key(struct flb_http *ctx, msgpack_object *map) return NULL; } +static ssize_t parse_payload_plaintext(struct flb_http *ctx, flb_sds_t tag, + char *payload, size_t size) +{ + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct flb_time tm; + size_t offset = 0; + const char *start; + const char *nl; + flb_sds_t tag_from_record = NULL; + msgpack_object entry_key; + msgpack_unpacked record; + int result; + void *out_buf; + size_t out_size; + struct flb_time out_time = {0}; + struct flb_log_event_encoder *log; + + + for (start = payload; start < payload+size && start != NULL; start = nl+1) { + nl = strchr(start, '\n'); + if (nl == NULL) { + nl = payload + size; + } + + if (nl-start <= 0) { + break; + } + + if (*start == '\0') { + break; + } + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_reset_record(&ctx->log_encoder); + return -1; + } + + + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, NULL); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_reset_record(&ctx->log_encoder); + return -1; + } + + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_STRING_VALUE(start, (nl-start))); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_reset_record(&ctx->log_encoder); + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_reset_record(&ctx->log_encoder); + return -1; + } + + if (tag) { + ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + + return 0; +} + static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, flb_sds_t tag, msgpack_object *record) @@ -766,6 +852,11 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, type = HTTP_CONTENT_URLENCODED; } + if (header->val.len == 10 && + strncasecmp(header->val.data, "text/plain", 10) == 0) { + type = HTTP_CONTENT_PLAIN; + } + if (type == -1) { send_response(conn, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -817,6 +908,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, else if (type == HTTP_CONTENT_URLENCODED) { ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); } + else if (type == HTTP_CONTENT_PLAIN) { + ret = parse_payload_plaintext(ctx, tag, request->data.data, request->data.len); + } if (uncompressed_data != NULL) { flb_free(uncompressed_data); @@ -1201,6 +1295,10 @@ static int process_payload_ng(flb_sds_t tag, type = HTTP_CONTENT_URLENCODED; } + if (strcasecmp(request->content_type, "text/plain") == 0) { + type = HTTP_CONTENT_PLAIN; + } + if (type == -1) { send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -1222,7 +1320,13 @@ static int process_payload_ng(flb_sds_t tag, return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); } } - + else if (type == HTTP_CONTENT_PLAIN) { + ctx = (struct flb_http *) request->stream->user_data; + payload = (char *) request->body; + if (payload) { + return parse_payload_plaintext(ctx, tag, payload, cfl_sds_len(payload)); + } + } return 0; } From 3caea300ca0d17b3f44c8b26e1fc8ca253364552 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Mon, 24 Mar 2025 17:12:36 -0300 Subject: [PATCH 2/2] in_http: add test for the text/plain content handler. Signed-off-by: Phillip Adair Stewart Whelan --- tests/runtime/in_http.c | 238 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 66ddaea5230..da1846a7773 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -29,6 +29,7 @@ #define JSON_CONTENT_TYPE "application/json" #define JSON_CHARSET_CONTENT_TYPE "application/json; charset=utf-8" +#define PLAINTEXT_CONTENT_TYPE "text/plain" struct http_client_ctx { struct flb_upstream *u; @@ -278,6 +279,240 @@ void flb_test_http() flb_upstream_conn_release(ctx->httpc->u_conn); test_ctx_destroy(ctx); } + +void flb_test_plaintext_legacy() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + int trys; + char buf[] = "[Fri Dec 16 01:46:23 2005] [error] [client 1.2.3.4] Directory index forbidden by rule: /home/test/\n" + "[Fri Dec 16 01:54:34 2005] [error] [client 1.2.3.4] Directory index forbidden by rule: /apache/web-data/test2\n" + "[Fri Dec 16 02:25:55 2005] [error] [client 1.2.3.4] Client sent malformed Host header\n" + "[Mon Dec 19 23:02:01 2005] [error] [client 1.2.3.4] user test: authentication failure for \"/~dcid/test1\": Password Mismatch;\n"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "[error] [client 1.2.3.4]"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + flb_input_set(ctx->flb, ctx->i_ffd, "http2", "off", NULL); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + PLAINTEXT_CONTENT_TYPE, strlen(PLAINTEXT_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + for (trys = 0, num = get_output_num(); trys < 3 && num < 4; trys++, num = get_output_num()) { + flb_time_msleep(1000); + } + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + if (!TEST_CHECK(num >= 4)) { + TEST_MSG("too few outputs"); + } + if (!TEST_CHECK(num < 5)) { + TEST_MSG("too many outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_plaintext() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + int trys; + char buf[] = "[Fri Dec 16 01:46:23 2005] [error] [client 1.2.3.4] Directory index forbidden by rule: /home/test/\n" + "[Fri Dec 16 01:54:34 2005] [error] [client 1.2.3.4] Directory index forbidden by rule: /apache/web-data/test2\n" + "[Fri Dec 16 02:25:55 2005] [error] [client 1.2.3.4] Client sent malformed Host header\n" + "[Mon Dec 19 23:02:01 2005] [error] [client 1.2.3.4] user test: authentication failure for \"/~dcid/test1\": Password Mismatch;\n"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "[error] [client 1.2.3.4]"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + PLAINTEXT_CONTENT_TYPE, strlen(PLAINTEXT_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + for (trys = 0, num = get_output_num(); trys < 3 && num < 4; trys++, num = get_output_num()) { + flb_time_msleep(1000); + } + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + if (!TEST_CHECK(num >= 4)) { + TEST_MSG("too few outputs"); + } + if (!TEST_CHECK(num < 5)) { + TEST_MSG("too many outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_plaintext_nonl() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + int trys; + char buf[] = "[Fri Dec 16 01:46:23 2005] [error] [client 1.2.3.4] Directory index forbidden by rule: /home/test/"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "[error] [client 1.2.3.4]"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + PLAINTEXT_CONTENT_TYPE, strlen(PLAINTEXT_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + for (trys = 0, num = get_output_num(); trys < 3 && num < 1; trys++, num = get_output_num()) { + flb_time_msleep(1000); + } + + num = get_output_num(); + if (!TEST_CHECK(num >= 1)) { + TEST_MSG("too few num outputs"); + } + if (!TEST_CHECK(num < 2)) { + TEST_MSG("too many num outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void flb_test_http_successful_response_code(char *response_code) { struct flb_lib_out_cb cb_data; @@ -673,6 +908,9 @@ void flb_test_http_tag_key_with_array_input() TEST_LIST = { {"http", flb_test_http}, + {"plaintext_legacy", flb_test_plaintext_legacy}, + {"plaintext", flb_test_plaintext}, + {"plaintext_nonl", flb_test_plaintext_nonl}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json},