Skip to content

Commit

Permalink
new(metrics): add falco.outputs_queue_num_drops metric
Browse files Browse the repository at this point in the history
Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
  • Loading branch information
incertum committed Aug 23, 2023
1 parent cefdecf commit 530d5d9
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 11 deletions.
6 changes: 2 additions & 4 deletions userspace/engine/falco_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,16 @@ static std::vector<std::string> priority_names = {
"Debug"
};

<<<<<<< HEAD
static std::vector<std::string> rule_matching_names = {
"first",
"all"
};
=======

static std::vector<std::string> outputs_recovery_names = {
"continue",
"exit",
"empty",
};
>>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings)
};

bool falco_common::parse_priority(std::string v, priority_type& out)
{
Expand Down
4 changes: 2 additions & 2 deletions userspace/falco/app/actions/process_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static falco::app::run_result do_inspect(
}

// for capture mode, the source name can change at every event
stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts);
stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts);
}
else
{
Expand All @@ -303,7 +303,7 @@ static falco::app::run_result do_inspect(
}

// for live mode, the source name is constant
stats_collector.collect(inspector, source, num_evts);
stats_collector.collect(inspector, source, s.outputs, num_evts);
}

// Reset the timeouts counter, Falco successfully got an event to process
Expand Down
8 changes: 8 additions & 0 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ falco_outputs::falco_outputs(
}

m_recovery = outputs_queue_recovery;
m_outputs_queue_num_drops = 0UL;
}

falco_outputs::~falco_outputs()
Expand Down Expand Up @@ -281,10 +282,12 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n");
exit(EXIT_FAILURE);
case falco_common::RECOVERY_EMPTY:
m_outputs_queue_num_drops += m_queue.size();
fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n");
m_queue.empty();
break;
default:
m_outputs_queue_num_drops++;
fprintf(stderr, "Output queue out of memory. Continue on ... \n");
break;
}
Expand Down Expand Up @@ -338,3 +341,8 @@ void falco_outputs::worker() noexcept
wd.cancel_timeout();
} while(cmsg.type != ctrl_msg_type::CTRL_MSG_STOP);
}

uint64_t falco_outputs::get_outputs_queue_num_drops()
{
return m_outputs_queue_num_drops;
}
7 changes: 7 additions & 0 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class falco_outputs
*/
void reopen_outputs();

/*!
\brief Return the number of currently dropped events as a result of failed push attempts
into the outputs queue when using `continue` or `empty` recovery strategies.
*/
uint64_t get_outputs_queue_num_drops();

private:
std::unique_ptr<falco_formats> m_formats;

Expand Down Expand Up @@ -111,6 +117,7 @@ class falco_outputs

falco_outputs_cbq m_queue;
uint32_t m_recovery;
uint64_t m_outputs_queue_num_drops;

std::thread m_worker_thread;
inline void push(const ctrl_msg& cmsg);
Expand Down
8 changes: 5 additions & 3 deletions userspace/falco/stats_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ stats_writer::collector::collector(const std::shared_ptr<stats_writer>& writer)
void stats_writer::collector::get_metrics_output_fields_wrapper(
nlohmann::json& output_fields,
const std::shared_ptr<sinsp>& inspector, uint64_t now,
const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec)
const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec)
{
static const char* all_driver_engines[] = {
BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE,
Expand All @@ -223,6 +223,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper(
output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch;
output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */
output_fields["falco.host_num_cpus"] = machine_info->num_cpus;
output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops;

output_fields["evt.source"] = src;
for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++)
Expand Down Expand Up @@ -407,7 +408,7 @@ void stats_writer::collector::get_metrics_output_fields_additional(
#endif
}

void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, const std::string &src, uint64_t num_evts)
void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, const std::string &src, const std::shared_ptr<falco_outputs>& outputs, uint64_t num_evts)
{
if (m_writer->has_output())
{
Expand All @@ -428,7 +429,8 @@ void stats_writer::collector::collect(const std::shared_ptr<sinsp>& inspector, c

/* Get respective metrics output_fields. */
nlohmann::json output_fields;
get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec);
uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops();
get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec);
get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src);

/* Send message in the queue */
Expand Down
4 changes: 2 additions & 2 deletions userspace/falco/stats_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ class stats_writer
\brief Collects one stats sample from an inspector
and for the given event source name
*/
void collect(const std::shared_ptr<sinsp>& inspector, const std::string& src, uint64_t num_evts);
void collect(const std::shared_ptr<sinsp>& inspector, const std::string& src, const std::shared_ptr<falco_outputs>& outputs, uint64_t num_evts);

private:
/*!
\brief Collect snapshot metrics wrapper fields as internal rule formatted output fields.
*/
void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec);
void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr<sinsp>& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec);

/*!
\brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields.
Expand Down

0 comments on commit 530d5d9

Please sign in to comment.