diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index 92fddf12e1d..d94306aac1a 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -27,18 +27,16 @@ static std::vector priority_names = { "Debug" }; -<<<<<<< HEAD static std::vector rule_matching_names = { "first", "all" }; -======= + static std::vector outputs_recovery_names = { "continue", "exit", "empty", - }; ->>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings) +}; bool falco_common::parse_priority(std::string v, priority_type& out) { diff --git a/userspace/falco/app/actions/process_events.cpp b/userspace/falco/app/actions/process_events.cpp index dc7a56a27ef..1437ab01697 100644 --- a/userspace/falco/app/actions/process_events.cpp +++ b/userspace/falco/app/actions/process_events.cpp @@ -284,7 +284,7 @@ static falco::app::run_result do_inspect( } // for capture mode, the source name can change at every event - stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts); + stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts); } else { @@ -303,7 +303,7 @@ static falco::app::run_result do_inspect( } // for live mode, the source name is constant - stats_collector.collect(inspector, source, num_evts); + stats_collector.collect(inspector, source, s.outputs, num_evts); } // Reset the timeouts counter, Falco successfully got an event to process diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 5add811e27e..77e9d48fbce 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -73,6 +73,7 @@ falco_outputs::falco_outputs( } m_recovery = outputs_queue_recovery; + m_outputs_queue_num_drops = 0UL; } falco_outputs::~falco_outputs() @@ -281,10 +282,12 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: + m_outputs_queue_num_drops += m_queue.size(); fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); m_queue.empty(); break; default: + m_outputs_queue_num_drops++; fprintf(stderr, "Output queue out of memory. Continue on ... \n"); break; } @@ -338,3 +341,8 @@ void falco_outputs::worker() noexcept wd.cancel_timeout(); } while(cmsg.type != ctrl_msg_type::CTRL_MSG_STOP); } + +uint64_t falco_outputs::get_outputs_queue_num_drops() +{ + return m_outputs_queue_num_drops; +} diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 00f47f0b7a4..35775e0eaec 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -83,6 +83,12 @@ class falco_outputs */ void reopen_outputs(); + /*! + \brief Return the number of currently dropped events as a result of failed push attempts + into the outputs queue when using `continue` or `empty` recovery strategies. + */ + uint64_t get_outputs_queue_num_drops(); + private: std::unique_ptr m_formats; @@ -111,6 +117,7 @@ class falco_outputs falco_outputs_cbq m_queue; uint32_t m_recovery; + uint64_t m_outputs_queue_num_drops; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index d910034f512..c58d378030c 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -206,7 +206,7 @@ stats_writer::collector::collector(const std::shared_ptr& writer) void stats_writer::collector::get_metrics_output_fields_wrapper( nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, - const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec) + const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec) { static const char* all_driver_engines[] = { BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE, @@ -223,6 +223,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper( output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.host_num_cpus"] = machine_info->num_cpus; + output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops; output_fields["evt.source"] = src; for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++) @@ -407,7 +408,7 @@ void stats_writer::collector::get_metrics_output_fields_additional( #endif } -void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, uint64_t num_evts) +void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, const std::shared_ptr& outputs, uint64_t num_evts) { if (m_writer->has_output()) { @@ -428,7 +429,8 @@ void stats_writer::collector::collect(const std::shared_ptr& inspector, c /* Get respective metrics output_fields. */ nlohmann::json output_fields; - get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec); + uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops(); + get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec); get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src); /* Send message in the queue */ diff --git a/userspace/falco/stats_writer.h b/userspace/falco/stats_writer.h index 548554a7277..5bcb2413132 100644 --- a/userspace/falco/stats_writer.h +++ b/userspace/falco/stats_writer.h @@ -58,13 +58,13 @@ class stats_writer \brief Collects one stats sample from an inspector and for the given event source name */ - void collect(const std::shared_ptr& inspector, const std::string& src, uint64_t num_evts); + void collect(const std::shared_ptr& inspector, const std::string& src, const std::shared_ptr& outputs, uint64_t num_evts); private: /*! \brief Collect snapshot metrics wrapper fields as internal rule formatted output fields. */ - void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec); + void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec); /*! \brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields.