From 7de2c45227468b51d6337141efbe2827dc48ea5f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 16 May 2024 11:57:49 +0900 Subject: [PATCH] in_premetheus_remote_write: Implement handler of payloads of prometheus remote write protocol (#8725) in_prometheus_remote_write: Implement prometheus remote write input plugin. This plugin is able to handle the following types currently: - Counter - Gauge - Untyped - Histogram Summary type of metrics shouldn't be handled and decoded correctly for now. --------- Signed-off-by: Hiroshi Hatake --- CMakeLists.txt | 1 + cmake/windows-setup.cmake | 1 + plugins/CMakeLists.txt | 1 + .../in_prometheus_remote_write/CMakeLists.txt | 12 + plugins/in_prometheus_remote_write/prom_rw.c | 250 +++++++++ plugins/in_prometheus_remote_write/prom_rw.h | 60 +++ .../prom_rw_config.c | 102 ++++ .../prom_rw_config.h | 29 ++ .../in_prometheus_remote_write/prom_rw_conn.c | 300 +++++++++++ .../in_prometheus_remote_write/prom_rw_conn.h | 57 ++ .../in_prometheus_remote_write/prom_rw_prot.c | 489 ++++++++++++++++++ .../in_prometheus_remote_write/prom_rw_prot.h | 39 ++ 12 files changed, 1341 insertions(+) create mode 100644 plugins/in_prometheus_remote_write/CMakeLists.txt create mode 100644 plugins/in_prometheus_remote_write/prom_rw.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_config.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_config.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_conn.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_conn.h create mode 100644 plugins/in_prometheus_remote_write/prom_rw_prot.c create mode 100644 plugins/in_prometheus_remote_write/prom_rw_prot.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a901fbf36ed..4960b533aa8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -209,6 +209,7 @@ option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input pl option(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" Yes) option(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" Yes) option(FLB_IN_PROCESS_EXPORTER_METRICS "Enable process exporter metrics input plugin" Yes) +option(FLB_IN_PROMETHEUS_REMOTE_WRITE "Enable prometheus remote write input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_LOGS_INGESTION "Enable Azure Logs Ingestion output plugin" Yes) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 53a6775c2de..60b67bc1e99 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -61,6 +61,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_PODMAN_METRICS No) set(FLB_IN_ELASTICSEARCH Yes) set(FLB_IN_SPLUNK Yes) + set(FLB_IN_PROMETHEUS_REMOTE_WRITE Yes) # OUTPUT plugins # ============== diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 1bd63924b71..9006ef6d823 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -229,6 +229,7 @@ REGISTER_IN_PLUGIN("in_opentelemetry") REGISTER_IN_PLUGIN("in_elasticsearch") REGISTER_IN_PLUGIN("in_calyptia_fleet") REGISTER_IN_PLUGIN("in_splunk") +REGISTER_IN_PLUGIN("in_prometheus_remote_write") # Test the event loop messaging when used in threaded mode REGISTER_IN_PLUGIN("in_event_test") diff --git a/plugins/in_prometheus_remote_write/CMakeLists.txt b/plugins/in_prometheus_remote_write/CMakeLists.txt new file mode 100644 index 00000000000..ee8ce703ae2 --- /dev/null +++ b/plugins/in_prometheus_remote_write/CMakeLists.txt @@ -0,0 +1,12 @@ +if(NOT FLB_METRICS) + message(FATAL_ERROR "Prometheus remote write input plugin requires FLB_HTTP_SERVER=On.") +endif() + +set(src + prom_rw.c + prom_rw_prot.c + prom_rw_conn.c + prom_rw_config.c + ) + +FLB_PLUGIN(in_prometheus_remote_write "${src}" "monkey-core-static") diff --git a/plugins/in_prometheus_remote_write/prom_rw.c b/plugins/in_prometheus_remote_write/prom_rw.c new file mode 100644 index 00000000000..e2cb852815c --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.c @@ -0,0 +1,250 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.in_in (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.in_in + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" +#include "prom_rw_config.h" + +/* + * For a server event, the collection event means a new client have arrived, we + * accept the connection and create a new TCP instance which will wait for + * JSON map messages. + */ +static int prom_rw_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_connection *connection; + struct prom_remote_write_conn *conn; + struct flb_prom_remote_write *ctx; + + ctx = in_context; + + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return -1; + } + + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); + + conn = prom_rw_conn_add(connection, ctx); + + if (conn == NULL) { + return -1; + } + + return 0; +} + +static int prom_rw_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + unsigned short int port; + int ret; + struct flb_prom_remote_write *ctx; + + (void) data; + + /* Create context and basic conf */ + ctx = prom_rw_config_create(ins); + if (!ctx) { + return -1; + } + ctx->collector_id = -1; + + /* Populate context with config map defaults and incoming properties */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + prom_rw_config_destroy(ctx); + return -1; + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); + + if (ctx->enable_http2) { + ret = flb_http_server_init(&ctx->http_server, + HTTP_PROTOCOL_AUTODETECT, + (FLB_HTTP_SERVER_FLAG_KEEPALIVE | FLB_HTTP_SERVER_FLAG_AUTO_INFLATE), + NULL, + ins->host.listen, + ins->host.port, + ins->tls, + ins->flags, + &ins->net_setup, + flb_input_event_loop_get(ins), + ins->config, + (void *) ctx); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not initialize http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ret = flb_http_server_start(&ctx->http_server); + + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not start http server on %s:%u. Aborting", + ins->host.listen, ins->host.port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + ctx->http_server.request_callback = prom_rw_prot_handle_ng; + + flb_input_downstream_set(ctx->http_server.downstream, ctx->ins); + } + else { + ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, + ins->flags, + ctx->listen, + port, + ins->tls, + config, + &ins->net_setup); + + if (ctx->downstream == NULL) { + flb_plg_error(ctx->ins, + "could not initialize downstream on %s:%s. Aborting", + ctx->listen, ctx->tcp_port); + + prom_rw_config_destroy(ctx); + + return -1; + } + + flb_input_downstream_set(ctx->downstream, ctx->ins); + + /* Collect upon data available on the standard input */ + ret = flb_input_set_collector_socket(ins, + prom_rw_collect, + ctx->downstream->server_fd, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin"); + prom_rw_config_destroy(ctx); + return -1; + } + + ctx->collector_id = ret; + } + + flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port); + + if (ctx->successful_response_code != 200 && + ctx->successful_response_code != 201 && + ctx->successful_response_code != 204) { + flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201", + ctx->successful_response_code); + ctx->successful_response_code = 201; + } + + return 0; +} + +static int prom_rw_exit(void *data, struct flb_config *config) +{ + struct flb_prom_remote_write *ctx; + + (void) config; + + ctx = data; + + if (ctx != NULL) { + prom_rw_config_destroy(ctx); + } + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "http2", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, enable_http2), + NULL + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_max_size), + "" + }, + + { + FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, buffer_chunk_size), + "" + }, + + { + FLB_CONFIG_MAP_STR, "uri", NULL, + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, uri), + "Specify an optional HTTP URI for the target web server, e.g: /something" + }, + + { + FLB_CONFIG_MAP_BOOL, "tag_from_uri", "true", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, tag_from_uri), + "If true, tag will be created from uri. e.g. v1_metrics from /v1/metrics ." + }, + { + FLB_CONFIG_MAP_INT, "successful_response_code", "201", + 0, FLB_TRUE, offsetof(struct flb_prom_remote_write, successful_response_code), + "Set successful response code. 200, 201 and 204 are supported." + }, + + /* EOF */ + {0} +}; + +/* Plugin reference */ +struct flb_input_plugin in_prometheus_remote_write_plugin = { + .name = "prometheus_remote_write", + .description = "Prometheus Remote Write input", + .cb_init = prom_rw_init, + .cb_pre_run = NULL, + .cb_collect = prom_rw_collect, + .cb_flush_buf = NULL, + .cb_pause = NULL, + .cb_resume = NULL, + .cb_exit = prom_rw_exit, + .config_map = config_map, + .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS +}; diff --git a/plugins/in_prometheus_remote_write/prom_rw.h b/plugins/in_prometheus_remote_write/prom_rw.h new file mode 100644 index 00000000000..698e5c89dd5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_H +#define FLB_IN_PROM_RW_H + +#include +#include +#include + +#include +#include + +#define HTTP_BUFFER_MAX_SIZE "4M" +#define HTTP_BUFFER_CHUNK_SIZE "512K" + +struct flb_prom_remote_write { + int successful_response_code; + flb_sds_t listen; + flb_sds_t tcp_port; + int tag_from_uri; + + struct flb_input_instance *ins; + + /* HTTP URI */ + char *uri; + + /* New gen HTTP server */ + int enable_http2; + struct flb_http_server http_server; + + /* Legacy HTTP server */ + size_t buffer_max_size; /* Maximum buffer size */ + size_t buffer_chunk_size; /* Chunk allocation size */ + + int collector_id; /* Listener collector id */ + struct flb_downstream *downstream; /* Client manager */ + struct mk_list connections; /* linked list of connections */ + + struct mk_server *server; +}; + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.c b/plugins/in_prometheus_remote_write/prom_rw_config.c new file mode 100644 index 00000000000..3df2ba125f6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.c @@ -0,0 +1,102 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_config.h" +#include "prom_rw_conn.h" + +/* default HTTP port for prometheus remote write */ +#define PROMETHEUS_REMOTE_WRITE_HTTP_PORT 8080 + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins) +{ + int ret; + char port[8]; + struct flb_prom_remote_write *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_prom_remote_write)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + mk_list_init(&ctx->connections); + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Listen interface (if not set, defaults to 0.0.0.0:80) */ + flb_input_net_default_listener("0.0.0.0", PROMETHEUS_REMOTE_WRITE_HTTP_PORT, ins); + + ctx->listen = flb_strdup(ins->host.listen); + snprintf(port, sizeof(port) - 1, "%d", ins->host.port); + ctx->tcp_port = flb_strdup(port); + + /* HTTP Server specifics */ + ctx->server = flb_calloc(1, sizeof(struct mk_server)); + if (ctx->server == NULL) { + flb_plg_error(ctx->ins, "error on mk_server allocation"); + prom_rw_config_destroy(ctx); + return NULL; + } + ctx->server->keep_alive = MK_TRUE; + + /* monkey detects server->workers == 0 as the server not being initialized at the + * moment so we want to make sure that it stays that way! + */ + + return ctx; +} + +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx) +{ + /* release all connections */ + prom_rw_conn_release_all(ctx); + + if (ctx->collector_id != -1) { + flb_input_collector_delete(ctx->collector_id, ctx->ins); + + ctx->collector_id = -1; + } + + if (ctx->downstream != NULL) { + flb_downstream_destroy(ctx->downstream); + } + + if (ctx->enable_http2) { + flb_http_server_destroy(&ctx->http_server); + } + + if (ctx->server) { + flb_free(ctx->server); + } + + flb_free(ctx->listen); + flb_free(ctx->tcp_port); + flb_free(ctx); + + return 0; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_config.h b/plugins/in_prometheus_remote_write/prom_rw_config.h new file mode 100644 index 00000000000..e1b624bf004 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_config.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONFIG_H +#define FLB_IN_PROM_RW_CONFIG_H + +#include +#include "prom_rw.h" + +struct flb_prom_remote_write *prom_rw_config_create(struct flb_input_instance *ins); +int prom_rw_config_destroy(struct flb_prom_remote_write *ctx); + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.c b/plugins/in_prometheus_remote_write/prom_rw_conn.c new file mode 100644 index 00000000000..7f730fdbda6 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.c @@ -0,0 +1,300 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" +#include "prom_rw_prot.h" + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request); + +static int prom_rw_conn_event(void *data) +{ + int status; + size_t size; + ssize_t available; + ssize_t bytes; + char *tmp; + char *request_end; + size_t request_len; + struct prom_remote_write_conn *conn; + struct mk_event *event; + struct flb_prom_remote_write *ctx; + struct flb_connection *connection; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + event = &connection->event; + + if (event->mask & MK_EVENT_READ) { + available = (conn->buf_size - conn->buf_len) - 1; + if (available < 1) { + if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) { + flb_plg_trace(ctx->ins, + "fd=%i incoming data exceed limit (%zu KB)", + event->fd, (ctx->buffer_max_size / 1024)); + prom_rw_conn_del(conn); + return -1; + } + + size = conn->buf_size + ctx->buffer_chunk_size; + tmp = flb_realloc(conn->buf_data, size); + if (!tmp) { + flb_errno(); + return -1; + } + flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu", + event->fd, conn->buf_size, size); + + conn->buf_data = tmp; + conn->buf_size = size; + available = (conn->buf_size - conn->buf_len) - 1; + } + + /* Read data */ + bytes = flb_io_net_read(connection, + (void *) &conn->buf_data[conn->buf_len], + available); + + if (bytes <= 0) { + flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi", + bytes, conn->buf_len, conn->buf_len + bytes); + conn->buf_len += bytes; + conn->buf_data[conn->buf_len] = '\0'; + + status = mk_http_parser(&conn->request, &conn->session.parser, + conn->buf_data, conn->buf_len, conn->session.server); + + if (status == MK_HTTP_PARSER_OK) { + /* Do more logic parsing and checks for this request */ + prom_rw_prot_handle(ctx, conn, &conn->session, &conn->request); + + /* Evict the processed request from the connection buffer and reinitialize + * the HTTP parser. + */ + + request_end = NULL; + + if (NULL != conn->request.data.data) { + request_end = &conn->request.data.data[conn->request.data.len]; + } + else { + request_end = strstr(conn->buf_data, "\r\n\r\n"); + + if(NULL != request_end) { + request_end = &request_end[4]; + } + } + + if (NULL != request_end) { + request_len = (size_t)(request_end - conn->buf_data); + + if (0 < (conn->buf_len - request_len)) { + memmove(conn->buf_data, &conn->buf_data[request_len], + conn->buf_len - request_len); + + conn->buf_data[conn->buf_len - request_len] = '\0'; + conn->buf_len -= request_len; + } + else { + memset(conn->buf_data, 0, request_len); + + conn->buf_len = 0; + } + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + } + else if (status == MK_HTTP_PARSER_ERROR) { + prom_rw_prot_handle_error(ctx, conn, &conn->session, &conn->request); + + /* Reinitialize the parser so the next request is properly + * handled, the additional memset intends to wipe any left over data + * from the headers parsed in the previous request. + */ + memset(&conn->session.parser, 0, sizeof(struct mk_http_parser)); + mk_http_parser_init(&conn->session.parser); + prom_rw_conn_request_init(&conn->session, &conn->request); + } + + return bytes; + } + + if (event->mask & MK_EVENT_CLOSE) { + flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd); + prom_rw_conn_del(conn); + return -1; + } + + return 0; + +} + +static void prom_rw_conn_session_init(struct mk_http_session *session, + struct mk_server *server, + int client_fd) +{ + /* Alloc memory for node */ + session->_sched_init = MK_TRUE; + session->pipelined = MK_FALSE; + session->counter_connections = 0; + session->close_now = MK_FALSE; + session->status = MK_REQUEST_STATUS_INCOMPLETE; + session->server = server; + session->socket = client_fd; + + /* creation time in unix time */ + session->init_time = time(NULL); + + session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket); + session->channel->io = session->server->network; + + /* Init session request list */ + mk_list_init(&session->request_list); + + /* Initialize the parser */ + mk_http_parser_init(&session->parser); +} + +static void prom_rw_conn_request_init(struct mk_http_session *session, + struct mk_http_request *request) +{ + memset(request, 0, sizeof(struct mk_http_request)); + + mk_http_request_init(session, request, session->server); + + request->in_headers.type = MK_STREAM_IOV; + request->in_headers.dynamic = MK_FALSE; + request->in_headers.cb_consumed = NULL; + request->in_headers.cb_finished = NULL; + request->in_headers.stream = &request->stream; + + mk_list_add(&request->in_headers._head, &request->stream.inputs); + + request->session = session; +} + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx) +{ + struct prom_remote_write_conn *conn; + int ret; + + conn = flb_calloc(1, sizeof(struct prom_remote_write_conn)); + if (!conn) { + flb_errno(); + return NULL; + } + conn->connection = connection; + + /* Set data for the event-loop */ + MK_EVENT_NEW(&connection->event); + + connection->user_data = conn; + connection->event.type = FLB_ENGINE_EV_CUSTOM; + connection->event.handler = prom_rw_conn_event; + + /* Connection info */ + conn->ctx = ctx; + conn->buf_len = 0; + + conn->buf_data = flb_malloc(ctx->buffer_chunk_size); + if (!conn->buf_data) { + flb_errno(); + flb_plg_error(ctx->ins, "could not allocate new connection"); + flb_free(conn); + return NULL; + } + conn->buf_size = ctx->buffer_chunk_size; + + /* Register instance into the event loop */ + ret = mk_event_add(flb_engine_evl_get(), + connection->fd, + FLB_ENGINE_EV_CUSTOM, + MK_EVENT_READ, + &connection->event); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not register new connection"); + flb_free(conn->buf_data); + flb_free(conn); + return NULL; + } + + /* Initialize HTTP Session: this is a custom context for Monkey HTTP */ + prom_rw_conn_session_init(&conn->session, ctx->server, connection->fd); + + /* Initialize HTTP Request: this is the initial request and it will be reinitialized + * automatically after the request is handled so it can be used for the next one. + */ + prom_rw_conn_request_init(&conn->session, &conn->request); + + /* Link connection node to parent context list */ + mk_list_add(&conn->_head, &ctx->connections); + return conn; +} + +int prom_rw_conn_del(struct prom_remote_write_conn *conn) +{ + if (conn->session.channel != NULL) { + mk_channel_release(conn->session.channel); + } + + /* The downstream unregisters the file descriptor from the event-loop + * so there's nothing to be done by the plugin + */ + flb_downstream_conn_release(conn->connection); + + mk_list_del(&conn->_head); + + flb_free(conn->buf_data); + flb_free(conn); + + return 0; +} + +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct prom_remote_write_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct prom_remote_write_conn, _head); + prom_rw_conn_del(conn); + } +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_conn.h b/plugins/in_prometheus_remote_write/prom_rw_conn.h new file mode 100644 index 00000000000..1716c32b87a --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_conn.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_CONN_H +#define FLB_IN_PROM_RW_CONN_H + +#include +#include +#include +#include + +#include "prom_rw_conn.h" + +struct prom_remote_write_conn { + struct mk_event event; /* Built-in event data for mk_events */ + + /* Buffer */ + char *buf_data; /* Buffer data */ + int buf_len; /* Data length */ + int buf_size; /* Buffer size */ + + /* + * Parser context: we only held one parser per connection + * which is re-used everytime we have a new request. + */ + struct mk_http_parser parser; + struct mk_http_request request; + struct mk_http_session session; + struct flb_connection *connection; + + void *ctx; /* Plugin parent context */ + struct mk_list _head; /* link to flb_opentelemetry->connections */ +}; + +struct prom_remote_write_conn *prom_rw_conn_add(struct flb_connection *connection, + struct flb_prom_remote_write *ctx); +int prom_rw_conn_del(struct prom_remote_write_conn *conn); +void prom_rw_conn_release_all(struct flb_prom_remote_write *ctx); + + +#endif diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.c b/plugins/in_prometheus_remote_write/prom_rw_prot.c new file mode 100644 index 00000000000..1ffc23fdc44 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.c @@ -0,0 +1,489 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "prom_rw.h" +#include "prom_rw_conn.h" + +static int send_response(struct flb_input_instance *in, + struct prom_remote_write_conn *conn, + int http_status, char *message) +{ + int len; + flb_sds_t out; + size_t sent; + ssize_t bytes; + int result; + + out = flb_sds_create_size(256); + if (!out) { + return -1; + } + + if (message) { + len = strlen(message); + } + else { + len = 0; + } + + if (http_status == 201) { + flb_sds_printf(&out, + "HTTP/1.1 201 Created \r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 200) { + flb_sds_printf(&out, + "HTTP/1.1 200 OK\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: 0\r\n\r\n", + FLB_VERSION_STR); + } + else if (http_status == 204) { + flb_sds_printf(&out, + "HTTP/1.1 204 No Content\r\n" + "Server: Fluent Bit v%s\r\n" + "\r\n", + FLB_VERSION_STR); + } + else if (http_status == 400) { + flb_sds_printf(&out, + "HTTP/1.1 400 Forbidden\r\n" + "Server: Fluent Bit v%s\r\n" + "Content-Length: %i\r\n\r\n%s", + FLB_VERSION_STR, + len, message); + } + + /* We should check the outcome of this operation */ + bytes = flb_io_net_write(conn->connection, + (void *) out, + flb_sds_len(out), + &sent); + + if (bytes == -1) { + flb_plg_error(in, "cannot send response"); + + result = -1; + } + else { + result = 0; + } + + flb_sds_destroy(out); + + return result; +} + +static int process_payload_metrics(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->data.data, + request->data.len); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +static inline int mk_http_point_header(mk_ptr_t *h, + struct mk_http_parser *parser, int key) +{ + struct mk_http_header *header; + + header = &parser->headers[key]; + if (header->type == key) { + h->data = header->val.data; + h->len = header->val.len; + return 0; + } + else { + h->data = NULL; + h->len = -1; + } + + return -1; +} + +static int uncompress_snappy(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_snappy_uncompress_framed_data(input_buffer, + input_size, + output_buffer, + output_size); + + if (ret != 0) { + flb_error("[opentelemetry] snappy decompression failed"); + + return -1; + } + + return 1; +} + +static int uncompress_gzip(char **output_buffer, + size_t *output_size, + char *input_buffer, + size_t input_size) +{ + int ret; + + ret = flb_gzip_uncompress(input_buffer, + input_size, + (void *) output_buffer, + output_size); + + if (ret == -1) { + flb_error("[opentelemetry] gzip decompression failed"); + + return -1; + } + + return 1; +} + +int prom_rw_prot_uncompress(struct mk_http_session *session, + struct mk_http_request *request, + char **output_buffer, + size_t *output_size) +{ + struct mk_http_header *header; + size_t index; + + *output_buffer = NULL; + *output_size = 0; + + for (index = 0; + index < session->parser.headers_extra_count; + index++) { + header = &session->parser.headers_extra[index]; + + if (strncasecmp(header->key.data, "Content-Encoding", 16) == 0) { + if (strncasecmp(header->val.data, "gzip", 4) == 0) { + return uncompress_gzip(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else if (strncasecmp(header->val.data, "snappy", 6) == 0) { + return uncompress_snappy(output_buffer, + output_size, + request->data.data, + request->data.len); + } + else { + return -2; + } + } + } + + return 0; +} + + +/* + * Handle an incoming request. It perform extra checks over the request, if + * everything is OK, it enqueue the incoming payload. + */ +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int i; + int ret = -1; + int len; + char *uri; + char *qs; + off_t diff; + flb_sds_t tag; + struct mk_http_header *header; + char *original_data; + size_t original_data_size; + char *uncompressed_data; + size_t uncompressed_data_size; + + if (request->uri.data[0] != '/') { + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; + } + + /* Decode URI */ + uri = mk_utils_url_decode(request->uri); + if (!uri) { + uri = mk_mem_alloc_z(request->uri.len + 1); + if (!uri) { + return -1; + } + memcpy(uri, request->uri.data, request->uri.len); + uri[request->uri.len] = '\0'; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) != 0) { + send_response(ctx->ins, conn, 400, "error: invalid endpoint\n"); + mk_mem_free(uri); + + return -1; + } + + /* Try to match a query string so we can remove it */ + qs = strchr(uri, '?'); + if (qs) { + /* remove the query string part */ + diff = qs - uri; + uri[diff] = '\0'; + } + + /* Compose the query string using the URI */ + len = strlen(uri); + + if (ctx->tag_from_uri != FLB_TRUE) { + tag = flb_sds_create(ctx->ins->tag); + } + else { + tag = flb_sds_create_size(len); + if (!tag) { + mk_mem_free(uri); + return -1; + } + + /* New tag skipping the URI '/' */ + flb_sds_cat(tag, uri + 1, len - 1); + + /* Sanitize, only allow alphanum chars */ + for (i = 0; i < flb_sds_len(tag); i++) { + if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { + tag[i] = '_'; + } + } + } + + /* Check if we have a Host header: Hostname ; port */ + mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); + + /* Header: Connection */ + mk_http_point_header(&request->connection, &session->parser, + MK_HEADER_CONNECTION); + + /* HTTP/1.1 needs Host header */ + if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) { + flb_sds_destroy(tag); + mk_mem_free(uri); + return -1; + } + + /* Should we close the session after this request ? */ + mk_http_keepalive_check(session, request, ctx->server); + + /* Content Length */ + header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; + if (header->type == MK_HEADER_CONTENT_LENGTH) { + request->_content_length.data = header->val.data; + request->_content_length.len = header->val.len; + } + else { + request->_content_length.data = NULL; + } + + mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE); + + if (request->method != MK_METHOD_POST) { + flb_sds_destroy(tag); + mk_mem_free(uri); + send_response(ctx->ins, conn, 400, "error: invalid HTTP method\n"); + return -1; + } + + original_data = request->data.data; + original_data_size = request->data.len; + + ret = prom_rw_prot_uncompress(session, request, + &uncompressed_data, + &uncompressed_data_size); + + if (ret > 0) { + request->data.data = uncompressed_data; + request->data.len = uncompressed_data_size; + } + + if (ctx->uri != NULL && strcmp(uri, ctx->uri) == 0) { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + else { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + + if (uncompressed_data != NULL) { + flb_free(uncompressed_data); + } + + request->data.data = original_data; + request->data.len = original_data_size; + + mk_mem_free(uri); + flb_sds_destroy(tag); + + send_response(ctx->ins, conn, ctx->successful_response_code, NULL); + + return ret; +} + +/* + * Handle an incoming request which has resulted in an http parser error. + */ +int prom_rw_prot_handle_error( + struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request) +{ + send_response(ctx->ins, conn, 400, "error: invalid request\n"); + return -1; +} + + +/* New gen HTTP server */ +static int send_response_ng(struct flb_http_response *response, + int http_status, + char *message) +{ + flb_http_response_set_status(response, http_status); + + if (http_status == 201) { + flb_http_response_set_message(response, "Created"); + } + else if (http_status == 200) { + flb_http_response_set_message(response, "OK"); + } + else if (http_status == 204) { + flb_http_response_set_message(response, "No Content"); + } + else if (http_status == 400) { + flb_http_response_set_message(response, "Forbidden"); + } + + if (message != NULL) { + flb_http_response_set_body(response, + (unsigned char *) message, + strlen(message)); + } + + flb_http_response_commit(response); + + return 0; +} + +static int process_payload_metrics_ng(struct flb_prom_remote_write *ctx, + flb_sds_t tag, + struct flb_http_request *request, + struct flb_http_response *response) +{ + struct cmt *context; + int result; + + result = cmt_decode_prometheus_remote_write_create(&context, + request->body, + cfl_sds_len(request->body)); + + if (result == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS) { + result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + + if (result != 0) { + flb_plg_debug(ctx->ins, "could not ingest metrics : %d", result); + } + + cmt_decode_prometheus_remote_write_destroy(context); + } + + return 0; +} + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response) +{ + struct flb_prom_remote_write *context; + int result; + + context = (struct flb_prom_remote_write *) response->stream->user_data; + + if (request->path[0] != '/') { + send_response_ng(response, 400, "error: invalid request\n"); + return -1; + } + + /* ToDo: Fix me */ + /* HTTP/1.1 needs Host header */ + if (request->protocol_version == HTTP_PROTOCOL_HTTP1 && + request->host == NULL) { + + return -1; + } + + if (request->method != HTTP_METHOD_POST) { + send_response_ng(response, 400, "error: invalid HTTP method\n"); + + return -1; + } + + if (context->uri != NULL && strcmp(request->path, context->uri) == 0) { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + else { + result = process_payload_metrics_ng(context, context->ins->tag, request, response); + } + + send_response_ng(response, context->successful_response_code, NULL); + + return result; +} diff --git a/plugins/in_prometheus_remote_write/prom_rw_prot.h b/plugins/in_prometheus_remote_write/prom_rw_prot.h new file mode 100644 index 00000000000..35f376884d5 --- /dev/null +++ b/plugins/in_prometheus_remote_write/prom_rw_prot.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PROM_RW_PROT +#define FLB_IN_PROM_RW_PROT + +#include + +int prom_rw_prot_handle(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + +int prom_rw_prot_handle_error(struct flb_prom_remote_write *ctx, + struct prom_remote_write_conn *conn, + struct mk_http_session *session, + struct mk_http_request *request); + + +int prom_rw_prot_handle_ng(struct flb_http_request *request, + struct flb_http_response *response); + +#endif