From 6c1d013e0750410d8dcf2050bcd302a41498d092 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 9 Nov 2025 07:14:46 -0600 Subject: [PATCH 1/2] router: for direct routes improve the handling of input instance source - Extended struct flb_input_routes to retain the parsed plugin name, track whether an alias was provided, and cache the resolved input instance for cleanup and reuse. - Updated router configuration parsing and application to capture plugin names by default, resolve inputs by alias, internal instance name, or plugin type while avoiding reuse, and consume the enhanced resolver inside flb_router_apply_config. Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 3 + src/flb_router_config.c | 132 ++++++++++++++++++++++++++++---- 2 files changed, 118 insertions(+), 17 deletions(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index f934ac7d8e3..279f5f21718 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -150,6 +150,9 @@ struct flb_route { struct flb_input_routes { flb_sds_t input_name; + flb_sds_t plugin_name; + int has_alias; + struct flb_input_instance *instance; struct cfl_list processors; struct cfl_list routes; struct cfl_list _head; diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 9c21a481161..f743e3ba986 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -388,6 +388,10 @@ static void input_routes_destroy(struct flb_input_routes *input) flb_sds_destroy(input->input_name); } + if (input->plugin_name) { + flb_sds_destroy(input->plugin_name); + } + flb_free(input); } @@ -1080,6 +1084,8 @@ static int parse_input_section(struct flb_cf_section *section, struct cfl_list *input_routes, struct flb_config *config) { + uint32_t mask; + size_t before_count; struct flb_input_routes *input; struct cfl_kvlist *kvlist; struct cfl_variant *name_var; @@ -1088,8 +1094,7 @@ static int parse_input_section(struct flb_cf_section *section, struct cfl_kvlist *routes_kvlist; struct cfl_list *head; struct cfl_kvpair *pair; - uint32_t mask; - size_t before_count; + struct cfl_variant *alias_var; if (!section || !input_routes) { return -1; @@ -1130,13 +1135,29 @@ static int parse_input_section(struct flb_cf_section *section, cfl_list_init(&input->_head); cfl_list_init(&input->processors); cfl_list_init(&input->routes); + input->has_alias = FLB_FALSE; + input->instance = NULL; - input->input_name = copy_from_cfl_sds(name_var->data.as_string); - if (!input->input_name) { + input->plugin_name = copy_from_cfl_sds(name_var->data.as_string); + if (!input->plugin_name) { flb_free(input); return -1; } + alias_var = cfl_kvlist_fetch(kvlist, "alias"); + if (alias_var && alias_var->type == CFL_VARIANT_STRING && + cfl_sds_len(alias_var->data.as_string) > 0) { + input->input_name = copy_from_cfl_sds(alias_var->data.as_string); + input->has_alias = FLB_TRUE; + } + else { + input->input_name = copy_from_cfl_sds(name_var->data.as_string); + } + if (!input->input_name) { + input_routes_destroy(input); + return -1; + } + processors_var = cfl_kvlist_fetch(kvlist, "processors"); if (processors_var) { if (parse_processors(processors_var, &input->processors, config) != 0) { @@ -1223,33 +1244,110 @@ int flb_router_config_parse(struct flb_cf *cf, } /* Apply parsed router configuration to actual input/output instances */ +static int input_instance_already_selected(struct flb_config *config, + struct flb_input_routes *current, + struct flb_input_instance *candidate) +{ + struct cfl_list *head; + struct flb_input_routes *routes; + + if (!config || !candidate) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &config->input_routes) { + routes = cfl_list_entry(head, struct flb_input_routes, _head); + + if (routes == current) { + continue; + } + + if (routes->instance == candidate) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + static struct flb_input_instance *find_input_instance(struct flb_config *config, - flb_sds_t name) + struct flb_input_routes *routes) { struct mk_list *head; struct flb_input_instance *ins; + size_t key_len; - if (!config || !name) { + if (!config || !routes) { return NULL; } - mk_list_foreach(head, &config->inputs) { - ins = mk_list_entry(head, struct flb_input_instance, _head); + if (routes->instance) { + return routes->instance; + } - if (!ins->p) { - continue; + if (routes->has_alias && routes->input_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || !ins->alias) { + continue; + } + + if (strcmp(ins->alias, routes->input_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (ins->alias && strcmp(ins->alias, name) == 0) { - return ins; + if (routes->input_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p) { + continue; + } + + if (strcmp(ins->name, routes->input_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (strcmp(ins->name, name) == 0) { - return ins; + if (routes->plugin_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || !ins->p->name) { + continue; + } + + if (strcmp(ins->p->name, routes->plugin_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (ins->p->name && strcmp(ins->p->name, name) == 0) { - return ins; + if (routes->input_name) { + key_len = flb_sds_len(routes->input_name); + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || key_len == 0) { + continue; + } + + if (strncmp(ins->name, routes->input_name, key_len) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } } @@ -1355,7 +1453,7 @@ int flb_router_apply_config(struct flb_config *config) cfl_list_foreach(input_head, &config->input_routes) { input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head); - input_ins = find_input_instance(config, input_routes->input_name); + input_ins = find_input_instance(config, input_routes); if (!input_ins) { flb_warn("[router] could not find input instance '%s' for routes", input_routes->input_name ? input_routes->input_name : "(null)"); From 6c80f584ffdad6bc3ad7a42b02961b85f81de190 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 9 Nov 2025 07:16:46 -0600 Subject: [PATCH 2/2] tests: internal: routing: extend tests for routing paths Refreshed router tests to populate the new metadata, added coverage proving distinct routing for duplicate plugin inputs without aliases, registered the test case, and kept conditional routing fixtures in sync. Signed-off-by: Eduardo Silva --- tests/internal/conditional_routing.c | 3 + tests/internal/router_config.c | 293 +++++++++++++++++++++++++++ 2 files changed, 296 insertions(+) diff --git a/tests/internal/conditional_routing.c b/tests/internal/conditional_routing.c index 38c4c19cf2a..25e89b04a6d 100644 --- a/tests/internal/conditional_routing.c +++ b/tests/internal/conditional_routing.c @@ -883,6 +883,8 @@ static void setup_conditional_routes(struct flb_input_routes *input_routes, cfl_list_init(&input_routes->_head); cfl_list_init(&input_routes->routes); input_routes->input_name = flb_sds_create("tail"); + input_routes->plugin_name = flb_sds_create("tail"); + input_routes->has_alias = FLB_FALSE; /* Route 1: info_logs */ memset(route1, 0, sizeof(struct flb_route)); @@ -971,6 +973,7 @@ static void cleanup_conditional_routing_instances(struct flb_config *config, flb_sds_destroy(output2->alias); flb_sds_destroy(output3->alias); flb_sds_destroy(input_routes->input_name); + flb_sds_destroy(input_routes->plugin_name); flb_sds_destroy(route1->name); flb_sds_destroy(route2->name); flb_sds_destroy(route3->name); diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index 8b2067c89b0..094eb9e2dd8 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -1163,6 +1163,8 @@ void test_router_apply_config_success() cfl_list_init(&input_routes._head); cfl_list_init(&input_routes.routes); input_routes.input_name = flb_sds_create("dummy"); + input_routes.plugin_name = flb_sds_create("dummy"); + input_routes.has_alias = FLB_FALSE; cfl_list_add(&input_routes._head, &config.input_routes); memset(&route, 0, sizeof(route)); @@ -1188,6 +1190,7 @@ void test_router_apply_config_success() flb_sds_destroy(input.alias); flb_sds_destroy(output.alias); flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(input_routes.plugin_name); flb_sds_destroy(route.name); flb_sds_destroy(route_output.name); } @@ -1210,6 +1213,8 @@ void test_router_apply_config_missing_output() cfl_list_init(&input_routes._head); cfl_list_init(&input_routes.routes); input_routes.input_name = flb_sds_create("dummy"); + input_routes.plugin_name = flb_sds_create("dummy"); + input_routes.has_alias = FLB_FALSE; cfl_list_add(&input_routes._head, &config.input_routes); memset(&route, 0, sizeof(route)); @@ -1234,10 +1239,296 @@ void test_router_apply_config_missing_output() flb_sds_destroy(input.alias); flb_sds_destroy(output.alias); flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(input_routes.plugin_name); flb_sds_destroy(route.name); flb_sds_destroy(route_output.name); } +void test_router_apply_config_uses_input_alias() +{ + struct flb_config config; + struct flb_input_instance input_one; + struct flb_input_instance input_two; + struct flb_output_instance output_one; + struct flb_output_instance output_two; + struct flb_input_routes routes_one; + struct flb_input_routes routes_two; + struct flb_route route_one; + struct flb_route route_two; + struct flb_route_output route_output_one; + struct flb_route_output route_output_two; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin_one; + struct flb_output_plugin output_plugin_two; + struct flb_router_path *path; + + memset(&config, 0, sizeof(config)); + mk_list_init(&config.inputs); + mk_list_init(&config.outputs); + cfl_list_init(&config.input_routes); + + memset(&input_one, 0, sizeof(input_one)); + mk_list_init(&input_one._head); + cfl_list_init(&input_one.routes_direct); + cfl_list_init(&input_one.routes); + mk_list_init(&input_one.tasks); + mk_list_init(&input_one.chunks); + mk_list_init(&input_one.collectors); + snprintf(input_one.name, sizeof(input_one.name), "dummy.0"); + input_one.alias = flb_sds_create("input_one"); + input_one.p = &input_plugin; + input_one.config = &config; + mk_list_add(&input_one._head, &config.inputs); + + memset(&input_two, 0, sizeof(input_two)); + mk_list_init(&input_two._head); + cfl_list_init(&input_two.routes_direct); + cfl_list_init(&input_two.routes); + mk_list_init(&input_two.tasks); + mk_list_init(&input_two.chunks); + mk_list_init(&input_two.collectors); + snprintf(input_two.name, sizeof(input_two.name), "dummy.1"); + input_two.alias = flb_sds_create("input_two"); + input_two.p = &input_plugin; + input_two.config = &config; + mk_list_add(&input_two._head, &config.inputs); + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = "dummy"; + + memset(&output_one, 0, sizeof(output_one)); + mk_list_init(&output_one._head); + mk_list_init(&output_one.properties); + mk_list_init(&output_one.net_properties); + snprintf(output_one.name, sizeof(output_one.name), "stdout.0"); + output_one.alias = flb_sds_create("print_one"); + output_one.event_type = FLB_OUTPUT_LOGS; + output_one.p = &output_plugin_one; + mk_list_add(&output_one._head, &config.outputs); + + memset(&output_two, 0, sizeof(output_two)); + mk_list_init(&output_two._head); + mk_list_init(&output_two.properties); + mk_list_init(&output_two.net_properties); + snprintf(output_two.name, sizeof(output_two.name), "stdout.1"); + output_two.alias = flb_sds_create("print_two"); + output_two.event_type = FLB_OUTPUT_LOGS; + output_two.p = &output_plugin_two; + mk_list_add(&output_two._head, &config.outputs); + + memset(&output_plugin_one, 0, sizeof(output_plugin_one)); + output_plugin_one.name = "stdout"; + memset(&output_plugin_two, 0, sizeof(output_plugin_two)); + output_plugin_two.name = "stdout"; + + memset(&routes_one, 0, sizeof(routes_one)); + cfl_list_init(&routes_one._head); + cfl_list_init(&routes_one.routes); + routes_one.input_name = flb_sds_create("input_one"); + routes_one.plugin_name = flb_sds_create("dummy"); + routes_one.has_alias = FLB_TRUE; + cfl_list_add(&routes_one._head, &config.input_routes); + + memset(&route_one, 0, sizeof(route_one)); + cfl_list_init(&route_one._head); + cfl_list_init(&route_one.outputs); + route_one.name = flb_sds_create("route_one"); + route_one.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_one._head, &routes_one.routes); + + memset(&route_output_one, 0, sizeof(route_output_one)); + cfl_list_init(&route_output_one._head); + route_output_one.name = flb_sds_create("print_one"); + cfl_list_add(&route_output_one._head, &route_one.outputs); + + memset(&routes_two, 0, sizeof(routes_two)); + cfl_list_init(&routes_two._head); + cfl_list_init(&routes_two.routes); + routes_two.input_name = flb_sds_create("input_two"); + routes_two.plugin_name = flb_sds_create("dummy"); + routes_two.has_alias = FLB_TRUE; + cfl_list_add(&routes_two._head, &config.input_routes); + + memset(&route_two, 0, sizeof(route_two)); + cfl_list_init(&route_two._head); + cfl_list_init(&route_two.outputs); + route_two.name = flb_sds_create("route_two"); + route_two.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_two._head, &routes_two.routes); + + memset(&route_output_two, 0, sizeof(route_output_two)); + cfl_list_init(&route_output_two._head); + route_output_two.name = flb_sds_create("print_two"); + cfl_list_add(&route_output_two._head, &route_two.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + + TEST_CHECK(cfl_list_size(&input_one.routes_direct) == 1); + path = cfl_list_entry(input_one.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_one); + + TEST_CHECK(cfl_list_size(&input_two.routes_direct) == 1); + path = cfl_list_entry(input_two.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_two); + + flb_router_exit(&config); + + flb_sds_destroy(input_one.alias); + flb_sds_destroy(input_two.alias); + flb_sds_destroy(output_one.alias); + flb_sds_destroy(output_two.alias); + flb_sds_destroy(routes_one.input_name); + flb_sds_destroy(routes_one.plugin_name); + flb_sds_destroy(routes_two.input_name); + flb_sds_destroy(routes_two.plugin_name); + flb_sds_destroy(route_one.name); + flb_sds_destroy(route_two.name); + flb_sds_destroy(route_output_one.name); + flb_sds_destroy(route_output_two.name); +} + +void test_router_apply_config_distinct_instances_without_alias() +{ + struct flb_config config; + struct flb_input_instance input_one; + struct flb_input_instance input_two; + struct flb_output_instance output_one; + struct flb_output_instance output_two; + struct flb_input_routes routes_one; + struct flb_input_routes routes_two; + struct flb_route route_one; + struct flb_route route_two; + struct flb_route_output route_output_one; + struct flb_route_output route_output_two; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin_one; + struct flb_output_plugin output_plugin_two; + struct flb_router_path *path; + + memset(&config, 0, sizeof(config)); + mk_list_init(&config.inputs); + mk_list_init(&config.outputs); + cfl_list_init(&config.input_routes); + + memset(&input_one, 0, sizeof(input_one)); + mk_list_init(&input_one._head); + cfl_list_init(&input_one.routes_direct); + cfl_list_init(&input_one.routes); + mk_list_init(&input_one.tasks); + mk_list_init(&input_one.chunks); + mk_list_init(&input_one.collectors); + snprintf(input_one.name, sizeof(input_one.name), "dummy.0"); + input_one.p = &input_plugin; + input_one.config = &config; + mk_list_add(&input_one._head, &config.inputs); + + memset(&input_two, 0, sizeof(input_two)); + mk_list_init(&input_two._head); + cfl_list_init(&input_two.routes_direct); + cfl_list_init(&input_two.routes); + mk_list_init(&input_two.tasks); + mk_list_init(&input_two.chunks); + mk_list_init(&input_two.collectors); + snprintf(input_two.name, sizeof(input_two.name), "dummy.1"); + input_two.p = &input_plugin; + input_two.config = &config; + mk_list_add(&input_two._head, &config.inputs); + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = "dummy"; + + memset(&output_one, 0, sizeof(output_one)); + mk_list_init(&output_one._head); + mk_list_init(&output_one.properties); + mk_list_init(&output_one.net_properties); + snprintf(output_one.name, sizeof(output_one.name), "stdout.0"); + output_one.alias = flb_sds_create("print_one"); + output_one.event_type = FLB_OUTPUT_LOGS; + output_one.p = &output_plugin_one; + mk_list_add(&output_one._head, &config.outputs); + + memset(&output_two, 0, sizeof(output_two)); + mk_list_init(&output_two._head); + mk_list_init(&output_two.properties); + mk_list_init(&output_two.net_properties); + snprintf(output_two.name, sizeof(output_two.name), "stdout.1"); + output_two.alias = flb_sds_create("print_two"); + output_two.event_type = FLB_OUTPUT_LOGS; + output_two.p = &output_plugin_two; + mk_list_add(&output_two._head, &config.outputs); + + memset(&output_plugin_one, 0, sizeof(output_plugin_one)); + output_plugin_one.name = "stdout"; + memset(&output_plugin_two, 0, sizeof(output_plugin_two)); + output_plugin_two.name = "stdout"; + + memset(&routes_one, 0, sizeof(routes_one)); + cfl_list_init(&routes_one._head); + cfl_list_init(&routes_one.routes); + routes_one.input_name = flb_sds_create("dummy"); + routes_one.plugin_name = flb_sds_create("dummy"); + routes_one.has_alias = FLB_FALSE; + cfl_list_add(&routes_one._head, &config.input_routes); + + memset(&route_one, 0, sizeof(route_one)); + cfl_list_init(&route_one._head); + cfl_list_init(&route_one.outputs); + route_one.name = flb_sds_create("route_one"); + route_one.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_one._head, &routes_one.routes); + + memset(&route_output_one, 0, sizeof(route_output_one)); + cfl_list_init(&route_output_one._head); + route_output_one.name = flb_sds_create("print_one"); + cfl_list_add(&route_output_one._head, &route_one.outputs); + + memset(&routes_two, 0, sizeof(routes_two)); + cfl_list_init(&routes_two._head); + cfl_list_init(&routes_two.routes); + routes_two.input_name = flb_sds_create("dummy"); + routes_two.plugin_name = flb_sds_create("dummy"); + routes_two.has_alias = FLB_FALSE; + cfl_list_add(&routes_two._head, &config.input_routes); + + memset(&route_two, 0, sizeof(route_two)); + cfl_list_init(&route_two._head); + cfl_list_init(&route_two.outputs); + route_two.name = flb_sds_create("route_two"); + route_two.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_two._head, &routes_two.routes); + + memset(&route_output_two, 0, sizeof(route_output_two)); + cfl_list_init(&route_output_two._head); + route_output_two.name = flb_sds_create("print_two"); + cfl_list_add(&route_output_two._head, &route_two.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + + TEST_CHECK(cfl_list_size(&input_one.routes_direct) == 1); + path = cfl_list_entry(input_one.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_one); + + TEST_CHECK(cfl_list_size(&input_two.routes_direct) == 1); + path = cfl_list_entry(input_two.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_two); + + TEST_CHECK(routes_one.instance == &input_one); + TEST_CHECK(routes_two.instance == &input_two); + + flb_router_exit(&config); + + flb_sds_destroy(output_one.alias); + flb_sds_destroy(output_two.alias); + flb_sds_destroy(routes_one.input_name); + flb_sds_destroy(routes_one.plugin_name); + flb_sds_destroy(routes_two.input_name); + flb_sds_destroy(routes_two.plugin_name); + flb_sds_destroy(route_one.name); + flb_sds_destroy(route_two.name); + flb_sds_destroy(route_output_one.name); + flb_sds_destroy(route_output_two.name); +} + void test_router_route_default_precedence() { struct cfl_list routes; @@ -2048,6 +2339,8 @@ TEST_LIST = { { "parse_contexts_file", test_router_config_parse_file_contexts }, { "apply_config_success", test_router_apply_config_success }, { "apply_config_missing_output", test_router_apply_config_missing_output }, + { "apply_config_uses_input_alias", test_router_apply_config_uses_input_alias }, + { "apply_config_distinct_instances_without_alias", test_router_apply_config_distinct_instances_without_alias }, { "route_default_precedence", test_router_route_default_precedence }, { "condition_eval_logs_metadata_context", test_router_condition_eval_logs_metadata_context }, { "condition_eval_logs_group_context", test_router_condition_eval_logs_group_context },